A lightweight, auto-scaling queue for processing Go functions as jobs. Keep jobs simple, then compose behavior with wrappers for retries, timeouts, tracing, and more.
Inspired by Bull, Pond, Ants, and more.
- Auto-scaling worker pool (min/max workers)
- Composable job wrappers (retries, timeouts, backoffs, etc.)
- Priority queue with weighted dispatch
- Job scheduler for recurring and one-time jobs
- Pause/resume queue execution (local or distributed)
- Job metadata (ID, enqueue time, attempt count)
- Circuit breaker for fault tolerance
- Optional queue lifecycle hooks (enqueue/start/success/failure/reschedule)
- Queue-level middleware chain for all jobs
- Job tagging and batch tracking
- Overlap prevention and uniqueness constraints
- Workflow step checkpointing for retry-safe chains/dependencies
- Tracing hooks for observability
- Zero external dependencies for core functionality
Use this as a quick guide before diving into detailed sections.
| Capability | Primary APIs | What it solves |
|---|---|---|
| Queueing and workers | NewQueue, Enqueue, Stop |
Run background jobs with auto-scaling workers |
| Reliability | WithRetryPolicy, WithRetry, WithRetryIf, WithBackoff, WithRecover |
Handle transient failures and panic recovery |
| Time control | WithTimeout, WithDeadline, DelayEnqueue |
Bound execution and schedule delayed runs |
| Flow orchestration | WithChain, WithPipeline, WithBatch, WithDependsOn, WithCheckpoint |
Build multi-step and grouped workflows with configurable dependency failure modes |
| Concurrency safety | WithoutOverlap, WithUnique, WithConcurrencyByKey |
Prevent overlap, deduplicate work, and limit concurrent execution per key |
| Deferral and release | WithRelease, WithReleaseSelf, WithRateLimitRelease |
Re-enqueue instead of blocking workers |
| Rate and fault protection | WithRateLimit, WithCircuitBreaker |
Protect upstream services under load/failure |
| Observability and outcomes | WithTracing, WithOutcome, WithHooks, MetaFromContext, LastErrorFromContext |
Track attempts, prior retry errors, durations, and queue lifecycle transitions |
| Queue-wide wrappers | WithMiddleware |
Apply cross-cutting behavior to every enqueued job |
| Multi-queue routing | NewQueueManager, NewPriorityQueueManager, Register, Enqueue, DelayEnqueue, StartAll, StopAll |
Route standard or priority jobs to named queues with isolated worker pools |
| Prioritization and scheduling | NewPriorityQueue, NewPriorityQueueManager, PriorityQueue.EnqueueOrError, PriorityQueue.TryEnqueueOrError, NewScheduler |
Prioritize urgent jobs, route them by name, and run recurring work with typed enqueue outcomes |
- Standalone: Process jobs in-memory without external infrastructure. Great for CLI tools, internal services, or cases where Redis/SQS is unnecessary.
- With external queues: Use cq as the execution engine behind SQS, Redis, RabbitMQ, or any broker that feeds jobs.
- With external persistence: Keep durability outside cq with DB outbox polling or queue-native retries/DLQ semantics.
- Embedded: Add background processing to an existing app without introducing new operational infrastructure.
package main
import (
"context"
"log"
"os/signal"
"syscall"
"time"
"github.com/gnikyt/cq"
)
func doWork(ctx context.Context) error {
meta := cq.MetaFromContext(ctx)
log.Printf("job %s started, queued %v ago", meta.ID, time.Since(meta.EnqueuedAt))
time.Sleep(2 * time.Second)
log.Printf("job %s completed", meta.ID)
return nil
}
func main() {
// Listen for interrupt signals.
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Create queue with the signal context.
queue := cq.NewQueue(1, 10, 100, cq.WithContext(ctx))
queue.Start()
// Enqueue work...
queue.Enqueue(func(ctx context.Context) error {
return doWork(ctx)
})
// Wait for shutdown signal.
<-ctx.Done()
// Stop queue, wait for in-flight jobs to finish.
queue.Stop(true)
}highQ := cq.NewQueue(5, 50, 1000) // High-priority lane.
lowQ := cq.NewQueue(1, 5, 5000) // Bulk/background lane.
mgr := cq.NewQueueManager()
if err := mgr.Register("high", highQ); err != nil {
log.Fatal(err)
}
if err := mgr.Register("low", lowQ); err != nil {
log.Fatal(err)
}
mgr.StartAll()
defer mgr.StopAll(true)
if err := mgr.Enqueue("high", processCritical); err != nil {
log.Fatal(err)
}
if err := mgr.Enqueue("low", processBulk); err != nil {
log.Fatal(err)
}
if err := mgr.DelayEnqueue("low", processLater, 30*time.Second); err != nil {
log.Fatal(err)
}criticalBase := cq.NewQueue(5, 20, 500)
criticalBase.Start()
bulkBase := cq.NewQueue(2, 10, 1000)
bulkBase.Start()
pmgr := cq.NewPriorityQueueManager()
if err := pmgr.Register("critical", cq.NewPriorityQueue(criticalBase, 100)); err != nil {
log.Fatal(err)
}
if err := pmgr.Register("bulk", cq.NewPriorityQueue(bulkBase, 200)); err != nil {
log.Fatal(err)
}
defer pmgr.StopAll(true)
if err := pmgr.Enqueue("critical", processNow, cq.PriorityHighest); err != nil {
log.Fatal(err)
}
if err := pmgr.DelayEnqueue("bulk", processLater, cq.PriorityLow, time.Minute); err != nil {
log.Fatal(err)
}Wrappers let you add behavior to jobs without modifying the job itself. Compose them from innermost to outermost - the outermost wrapper runs first and controls the flow. This keeps job logic clean while adding retries, timeouts, tracing, and error handling declaratively.
job := WithOutcome( // 3. Outermost: catches final outcome.
WithRetryPolicy( // 2. Preferred retry wrapper.
WithTimeout( // 1. Innermost: runs with timeout.
actualJob,
5*time.Minute,
),
RetryPolicy{
MaxAttempts: 3,
Backoff: ExponentialBackoff,
},
),
onComplete,
onFail,
onDiscard,
)Execution flow:
WithOutcomecallsWithRetryPolicyWithRetryPolicycallsWithTimeoutWithTimeoutrunsactualJobwith a 5-minute timeout- If
actualJobfails, control returns up the chain for retry logic - After all retries,
WithOutcomereceives the final outcome
WithRetryPolicy is the recommended default for retry behavior. WithRetry,
WithRetryIf, and WithBackoff still exist for finer-grained manual composition.
Use these first when you want practical defaults quickly.
job := cq.WithRetryPolicy(
cq.WithTimeout(fetchFromAPI, 10*time.Second),
cq.RetryPolicy{
MaxAttempts: 3,
Backoff: cq.ExponentialBackoff,
},
)
queue.Enqueue(job)store := cq.NewMemoryCheckpointStore()
step := cq.WithCheckpoint(
sendInvoice,
"send-invoice",
store,
cq.WithCheckpointNamespace("billing"),
)
job := cq.WithChain(
validateOrder,
step, // Will be skipped on retry after first success.
notifyCustomer,
)
queue.Enqueue(job)locker := cq.NewUniqueMemoryLocker()
job := cq.WithUnique(
cq.WithTimeout(processOrder, 30*time.Second),
"order:123",
5*time.Minute,
locker,
)
queue.Enqueue(job)scheduler := cq.NewScheduler(context.Background(), queue)
defer scheduler.Stop()
_ = scheduler.Every("sync-products", 10*time.Minute, syncProductsJob)queue := cq.NewQueue(1, 100, 1000)
queue.Start()
defer queue.Stop(true)Parameters: NewQueue(minWorkers, maxWorkers, capacity).
// For normal jobs.
queue.Enqueue(job) // Blocking.
queue.EnqueueOrError(job) // Blocking, returns typed rejection error.
queue.TryEnqueue(job) // Non-blocking, returns bool.
queue.TryEnqueueOrError(job) // Non-blocking, returns typed (accepted, error).
queue.DelayEnqueue(job, 2*time.Minute) // Delayed.
queue.EnqueueBatch(jobs) // Multiple jobs.
queue.DelayEnqueueBatch(jobs, 30*time.Second) // Delayed, multiple jobs.Typed enqueue rejection errors:
cq.ErrQueueStoppedcq.ErrQueuePausedcq.ErrQueueFull
queue.RunningWorkers() // Current running workers.
queue.IdleWorkers() // Current idle workers.
queue.Capacity() // Job channel capacity.
queue.WorkerRange() // (min, max) workers.
queue.TallyOf(cq.JobStateFailed) // Count by state.
// Available job states for TallyOf:
// cq.JobStateCreated - Total jobs accepted.
// cq.JobStatePending - Jobs waiting in the queue.
// cq.JobStateActive - Jobs currently executing.
// cq.JobStateFailed - Jobs completed with error.
// cq.JobStateCompleted - Jobs completed successfully.queue := cq.NewQueue(1, 10, 100,
cq.WithWorkerIdleTick(500*time.Millisecond),
cq.WithContext(ctx),
cq.WithPanicHandler(func(err any) {
log.Printf("panic: %v", err)
}),
)
// Available options:
// cq.WithWorkerIdleTick(d) - Interval for idle worker cleanup (default 5s).
// cq.WithContext(ctx) - Parent context for the queue.
// cq.WithCancelableContext(ctx, fn) - Parent context with custom cancel function.
// cq.WithPanicHandler(fn) - Custom handler override for job panics.
// cq.WithIDGenerator(fn) - Override fallback job ID generation.
// cq.WithPauseStore(store, key) - Share pause state across queue instances.
// cq.WithPausePollTick(d) - Poll interval for distributed pause sync.
// cq.WithPauseBehavior(mode) - Buffer or reject enqueue while paused.
// cq.WithMiddleware(mw...) - Apply queue-level wrappers to all jobs.withLogging := func(next cq.Job) cq.Job {
return func(ctx context.Context) error {
log.Println("job start")
err := next(ctx)
log.Printf("job end: %v", err)
return err
}
}
queue := cq.NewQueue(1, 10, 100, cq.WithMiddleware(withLogging))WithMiddleware(a, b) executes as a(b(job)), so a first, then b. This allows you to apply common middlware to all jobs which are sent to that queue instead of per-job.
if err := queue.Pause(); err != nil {
log.Fatal(err)
}
// ... perform maintenance ...
if err := queue.Resume(); err != nil {
log.Fatal(err)
}Use cq.WithPauseBehavior(cq.PauseReject) if you prefer rejecting enqueue while paused.
queue.Stop(true) // Wait for jobs to finish.
queue.Stop(false) // Don't wait for queued jobs.
queue.Terminate() // Immediate shutdown.For detailed usage and advanced features, see the following guides:
- Job Wrappers - Complete reference for all job wrappers including retries, timeouts, tracing, rate limiting, circuit breakers, and custom wrappers
- Queue Options - Queue configuration options including context, panic handling, hooks, and custom ID generation
- Priority Queue - Weighted fair queuing with custom priority levels and dispatch strategies
- Queue Routing - Register named queues and route jobs to isolated worker pools
- Scheduler - Recurring and one-time job scheduling with cron-like behavior
- Custom Locker - Distributed lock implementations for
WithUniqueandWithoutOverlapwith Redis and SQLite examples - Custom Checkpoint Store - Distributed checkpoint implementations for
WithCheckpointwith Redis and SQLite examples - Custom Key Concurrency Limiter - Distributed limiter implementations for
WithConcurrencyByKeywith Redis and SQLite examples
make test
ok github.com/gnikyt/cq 18.755s coverage: 90.0% of statements
make bench
cpu: Apple M5
BenchmarkScenarios/100Req--10kJobs-10 7 192443179 ns/op
BenchmarkScenarios/1kReq--1kJobs-10 7 194722393 ns/op
BenchmarkScenarios/10kReq--100Jobs-10 7 352322048 ns/op
BenchmarkSingleSteadyState-10 3063700 393.4 ns/op
go run example/web_direct.gofor i in {1..500}; do
curl -s -X POST localhost:8080/order -d '{"demo":"yes"}' -H "Content-Type: application/json"
done