asynq Jobs Guide¶
asynq 는 Umbra 의 단기 백그라운드 작업과 cron 스케줄러를 담당한다. Redis 를 broker 로 사용하며 Temporal 과 역할 분리되어 있다 (ADR-0015).
Why asynq¶
Temporal 은 장기/복잡한 workflow 에 최적화돼 있어 수 초짜리 작업에도 쓰면 과잉이다. asynq 는:
- 가볍다 — Redis 하나만 있으면 됨
- 빠르다 — 단일 task 처리에 수 ms
- Go-idiomatic — 표준 Go 패턴 (handler, middleware)
- Cron 지원 — 정기 실행 내장
반면 durable execution, signal, 복잡한 orchestration 은 부족 → 그건 Temporal 담당.
When to use¶
asynq 적합¶
- Cron — 결제 cron, 스냅샷 cron, retention cleanup, outbox archive
- 단기 async — 알림 발송, DM 발송, 이벤트 fanout
- Live Sync 배치 — 5초 간격 길드 버퍼 플러시
- Retry charge — 24h/48h/72h 재시도 delayed task
- Outbox poller — 이벤트 dispatch
Temporal 사용 (asynq 부적합)¶
- Restore workflow (단계별 Activity + retry + signal)
- AntiNuke (길드별 장기 running + rolling window)
상세 비교: ADR-0015.
Architecture¶
Redis broker¶
asynq 는 Redis 를 queue + scheduler storage 로 사용. Upstash Redis 사용 (프로덕션).
Workers¶
apps/worker 프로세스가 asynq 서버 (consumer) 실행:
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: cfg.RedisAddr},
asynq.Config{
Concurrency: 20,
Queues: map[string]int{
"critical": 5,
"default": 3,
"low": 1,
},
},
)
Scheduler (cron)¶
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{Addr: cfg.RedisAddr},
&asynq.SchedulerOpts{},
)
Enqueuer (client)¶
도메인 서비스가 task 생성 시 client 사용:
Task naming¶
Task type 은 domain:action 패턴:
billing:charge_recurring
billing:retry_charge
recovery:sync_flush
recovery:scheduled_snapshot
recovery:cleanup_retention
notification:send
outbox:poll
outbox:archive
도메인 prefix 로 소유권 명확. 변경 시 마이그레이션 주의.
Task handler pattern¶
// apps/worker/internal/jobs/billing.go
type BillingJobHandler struct {
svc billing.Service
logger *slog.Logger
}
func (h *BillingJobHandler) HandleChargeRecurring(ctx context.Context, t *asynq.Task) error {
var payload ChargeRecurringPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("unmarshal: %w", asynq.SkipRetry) // non-retryable
}
return h.svc.ProcessRecurringCharge(ctx, payload.SubscriptionID)
}
type ChargeRecurringPayload struct {
SubscriptionID uuid.UUID `json:"subscription_id"`
}
Error handling¶
- Retryable error — 그냥
return err(asynq 가 재시도 스케줄) - Non-retryable —
return fmt.Errorf("...: %w", asynq.SkipRetry)(retry 안 함) - Panic — asynq 가 recover 후 failed 로 기록
Task registration¶
// apps/worker/internal/jobs/mux.go
func NewMux(handlers *Handlers) *asynq.ServeMux {
mux := asynq.NewServeMux()
// Billing
mux.HandleFunc("billing:charge_recurring", handlers.Billing.HandleChargeRecurring)
mux.HandleFunc("billing:retry_charge", handlers.Billing.HandleRetryCharge)
// Recovery
mux.HandleFunc("recovery:sync_flush", handlers.Recovery.HandleSyncFlush)
mux.HandleFunc("recovery:scheduled_snapshot", handlers.Recovery.HandleScheduledSnapshot)
mux.HandleFunc("recovery:cleanup_retention", handlers.Recovery.HandleCleanupRetention)
// Notification
mux.HandleFunc("notification:send", handlers.Notification.HandleSend)
// Outbox
mux.HandleFunc("outbox:poll", handlers.Outbox.HandlePoll)
mux.HandleFunc("outbox:archive", handlers.Outbox.HandleArchive)
// Middleware
return middleware.Apply(mux, loggingMiddleware, metricsMiddleware)
}
Enqueue¶
Immediate¶
payload, _ := json.Marshal(ChargeRecurringPayload{SubscriptionID: id})
task := asynq.NewTask("billing:charge_recurring", payload)
_, err := client.Enqueue(task)
Delayed (specific time)¶
task := asynq.NewTask("billing:retry_charge", payload)
_, err := client.Enqueue(task, asynq.ProcessAt(time.Now().Add(24*time.Hour)))
With options¶
_, err := client.Enqueue(task,
asynq.Queue("critical"),
asynq.MaxRetry(3),
asynq.Timeout(5*time.Minute),
asynq.Retention(24*time.Hour), // completed task 보관 기간
asynq.Unique(time.Hour), // 같은 payload 중복 enqueue 방지 (1시간)
)
Cron scheduling¶
// apps/worker/internal/cron/register.go
func RegisterCrons(scheduler *asynq.Scheduler, client *asynq.Client) error {
// 정기 결제 cron — 매시 정각
if _, err := scheduler.Register("0 * * * *", asynq.NewTask("billing:schedule_due", nil)); err != nil {
return err
}
// 일일 스냅샷 — 매일 00:00 KST
if _, err := scheduler.Register("0 15 * * *", asynq.NewTask("recovery:schedule_daily_snapshot", nil)); err != nil {
// KST 00:00 = UTC 15:00
return err
}
// 일일 retention cleanup — 매일 04:00 KST
if _, err := scheduler.Register("0 19 * * *", asynq.NewTask("recovery:cleanup_retention", nil)); err != nil {
return err
}
// 만료 세션 cleanup — 매일 03:00 KST
if _, err := scheduler.Register("0 18 * * *", asynq.NewTask("identity:cleanup_sessions", nil)); err != nil {
return err
}
// Outbox archive — 매일 05:00 KST
if _, err := scheduler.Register("0 20 * * *", asynq.NewTask("outbox:archive", nil)); err != nil {
return err
}
return nil
}
Cron handler → task enqueue¶
Cron task 자체는 빠르게 끝나고 실제 처리는 별도 task 로:
func HandleScheduleDueCharges(ctx context.Context, t *asynq.Task) error {
subs, err := billingSvc.ListDueForCharge(ctx, time.Now().Add(time.Hour))
if err != nil { return err }
for _, s := range subs {
delay := time.Until(s.NextBillingAt)
payload, _ := json.Marshal(ChargeRecurringPayload{SubscriptionID: s.ID})
task := asynq.NewTask("billing:charge_recurring", payload)
client.Enqueue(task,
asynq.ProcessIn(delay),
asynq.MaxRetry(0),
asynq.Unique(delay + time.Minute),
)
}
return nil
}
이 패턴이 "cron 이 대량 작업을 직접 처리" 보다 안전.
Priority queues¶
Enqueue 시 queue 지정:
사용 예:
- critical — 결제 charge, AntiNuke 응답 관련 알림
- default — 일반 알림, Live Sync 배치
- low — retention cleanup, archive
Uniqueness¶
중복 enqueue 방지:
내부적으로 Redis key (asynq:unique:...) 에 TTL 로 저장. 동일 요청이 여러 번 들어와도 하나만 처리.
Cron 의 중복 실행 방지에도 유용.
Idempotency¶
asynq 의 retry 특성상 handler 는 idempotent:
func (h *Handler) HandleChargeRecurring(ctx context.Context, t *asynq.Task) error {
var payload ChargeRecurringPayload
json.Unmarshal(t.Payload(), &payload)
sub, err := h.svc.GetSubscription(ctx, payload.SubscriptionID)
if err != nil { return err }
// Idempotency check — 이미 이 cycle 이 결제됐으면 skip
if sub.LastChargedCycle == sub.CurrentCycle {
return nil
}
return h.svc.ProcessRecurringCharge(ctx, payload.SubscriptionID)
}
Or: DB 쪽에서 UNIQUE constraint (payment_attempts.order_id) 로 중복 거부.
Middleware¶
func loggingMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
start := time.Now()
err := next.ProcessTask(ctx, t)
dur := time.Since(start)
logger.Info("task processed",
"type", t.Type(),
"duration_ms", dur.Milliseconds(),
"error", err,
)
return err
})
}
func metricsMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
tasksReceived.WithLabelValues(t.Type()).Inc()
start := time.Now()
err := next.ProcessTask(ctx, t)
taskDuration.WithLabelValues(t.Type()).Observe(time.Since(start).Seconds())
if err != nil {
tasksFailed.WithLabelValues(t.Type()).Inc()
}
return err
})
}
Dead letter handling¶
// asynq 자동: MaxRetry 초과 시 "archived" 상태로 이동
// 운영자가 asynqmon UI (web) 또는 CLI 로 확인/재큐
// 또는 ErrorHandler 로 외부 알림
srv := asynq.NewServer(opt, asynq.Config{
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, t *asynq.Task, err error) {
logger.Error("task failed",
"type", t.Type(),
"error", err,
"retry_count", asynq.GetRetryCount(ctx),
"max_retry", asynq.GetMaxRetry(ctx),
)
}),
})
Critical task (결제 관련) 의 dead letter 는 Slack 알림 연계 (Phase 2).
Graceful shutdown¶
현재 처리 중인 task 완료 대기 후 종료.
Fly.io 의 SIGTERM 핸들링:
func main() {
srv := asynq.NewServer(...)
go func() {
if err := srv.Run(mux); err != nil { log.Fatal(err) }
}()
// Wait for signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
<-sigs
srv.Shutdown()
}
Observability¶
asynqmon¶
asynq 의 공식 web UI. 로컬:
- Pending / active / completed / failed task 카운트
- Queue 별 통계
- Failed task payload + 에러 조회
Metrics¶
OpenTelemetry 로 export:
- Task processing duration (histogram)
- Tasks processed (counter, by type & status)
- Queue length (gauge)
Grafana Cloud 대시보드에서 시각화.
Logging¶
구조화된 필드:
task_typetask_idqueueretry_countduration_mserror(실패 시)
Testing¶
Unit test handler¶
func TestBillingHandler_ChargeRecurring(t *testing.T) {
mockSvc := &mockBillingService{chargeResult: ...}
h := &BillingJobHandler{svc: mockSvc, logger: testLogger}
payload, _ := json.Marshal(ChargeRecurringPayload{SubscriptionID: subID})
task := asynq.NewTask("billing:charge_recurring", payload)
err := h.HandleChargeRecurring(ctx, task)
assert.NoError(t, err)
assert.Len(t, mockSvc.chargeCalls, 1)
}
Integration test¶
func TestBillingWorker_Integration(t *testing.T) {
if testing.Short() { t.Skip() }
redisOpt := asynq.RedisClientOpt{Addr: "localhost:6379", DB: 15}
client := asynq.NewClient(redisOpt)
defer client.Close()
// 실제 task enqueue
payload, _ := json.Marshal(...)
client.Enqueue(asynq.NewTask("billing:charge_recurring", payload))
// 실제 server 기동 + 완료 대기
srv := asynq.NewServer(redisOpt, ...)
mux := setupMux(realServices)
go srv.Run(mux)
defer srv.Shutdown()
// assertion (DB 상태 확인)
time.Sleep(2 * time.Second)
sub, _ := billingRepo.GetByID(ctx, subID)
assert.Equal(t, "active", sub.Status)
}
Do / Don't¶
Do¶
- ✅ Task type 에 도메인 prefix (
billing:,recovery:) - ✅ Handler idempotent
- ✅ Retry 가능한 / 불가능한 에러 구분
- ✅
asynq.Unique로 중복 방지 - ✅ Priority queue 활용
- ✅ Cron 은 간단히, 실제 처리는 enqueue 후 별도 task
- ✅ 구조화된 로깅
- ✅ Graceful shutdown
Don't¶
- ❌ asynq 에 장기 workflow 구현 (Temporal 사용)
- ❌ Non-idempotent handler
- ❌ Handler 에서 panic (recover 있지만 원칙적 X)
- ❌ Payload 에 sensitive data (암호화 후 ID 만 전달)
- ❌ 무한 retry (MaxRetry 명시)
See also¶
temporal-workflow.md— 장기 workflowoutbox-pattern.md— 이벤트 전달backend-conventions.md../adr/0007-cache-queue-redis.md../adr/0015-asynq-vs-temporal-split.md