콘텐츠로 이동

asynq Jobs Guide

asynq 는 Umbra 의 단기 백그라운드 작업과 cron 스케줄러를 담당한다. Redis 를 broker 로 사용하며 Temporal 과 역할 분리되어 있다 (ADR-0015).

Why asynq

Temporal 은 장기/복잡한 workflow 에 최적화돼 있어 수 초짜리 작업에도 쓰면 과잉이다. asynq 는:

  • 가볍다 — Redis 하나만 있으면 됨
  • 빠르다 — 단일 task 처리에 수 ms
  • Go-idiomatic — 표준 Go 패턴 (handler, middleware)
  • Cron 지원 — 정기 실행 내장

반면 durable execution, signal, 복잡한 orchestration 은 부족 → 그건 Temporal 담당.

When to use

asynq 적합

  • Cron — 결제 cron, 스냅샷 cron, retention cleanup, outbox archive
  • 단기 async — 알림 발송, DM 발송, 이벤트 fanout
  • Live Sync 배치 — 5초 간격 길드 버퍼 플러시
  • Retry charge — 24h/48h/72h 재시도 delayed task
  • Outbox poller — 이벤트 dispatch

Temporal 사용 (asynq 부적합)

  • Restore workflow (단계별 Activity + retry + signal)
  • AntiNuke (길드별 장기 running + rolling window)

상세 비교: ADR-0015.

Architecture

Redis broker

asynq 는 Redis 를 queue + scheduler storage 로 사용. Upstash Redis 사용 (프로덕션).

Workers

apps/worker 프로세스가 asynq 서버 (consumer) 실행:

srv := asynq.NewServer(
    asynq.RedisClientOpt{Addr: cfg.RedisAddr},
    asynq.Config{
        Concurrency: 20,
        Queues: map[string]int{
            "critical": 5,
            "default":  3,
            "low":      1,
        },
    },
)

Scheduler (cron)

scheduler := asynq.NewScheduler(
    asynq.RedisClientOpt{Addr: cfg.RedisAddr},
    &asynq.SchedulerOpts{},
)

Enqueuer (client)

도메인 서비스가 task 생성 시 client 사용:

client := asynq.NewClient(asynq.RedisClientOpt{Addr: cfg.RedisAddr})

Task naming

Task type 은 domain:action 패턴:

billing:charge_recurring
billing:retry_charge
recovery:sync_flush
recovery:scheduled_snapshot
recovery:cleanup_retention
notification:send
outbox:poll
outbox:archive

도메인 prefix 로 소유권 명확. 변경 시 마이그레이션 주의.

Task handler pattern

// apps/worker/internal/jobs/billing.go
type BillingJobHandler struct {
    svc    billing.Service
    logger *slog.Logger
}

func (h *BillingJobHandler) HandleChargeRecurring(ctx context.Context, t *asynq.Task) error {
    var payload ChargeRecurringPayload
    if err := json.Unmarshal(t.Payload(), &payload); err != nil {
        return fmt.Errorf("unmarshal: %w", asynq.SkipRetry)  // non-retryable
    }

    return h.svc.ProcessRecurringCharge(ctx, payload.SubscriptionID)
}

type ChargeRecurringPayload struct {
    SubscriptionID uuid.UUID `json:"subscription_id"`
}

Error handling

  • Retryable error — 그냥 return err (asynq 가 재시도 스케줄)
  • Non-retryablereturn fmt.Errorf("...: %w", asynq.SkipRetry) (retry 안 함)
  • Panic — asynq 가 recover 후 failed 로 기록

Task registration

// apps/worker/internal/jobs/mux.go
func NewMux(handlers *Handlers) *asynq.ServeMux {
    mux := asynq.NewServeMux()

    // Billing
    mux.HandleFunc("billing:charge_recurring", handlers.Billing.HandleChargeRecurring)
    mux.HandleFunc("billing:retry_charge", handlers.Billing.HandleRetryCharge)

    // Recovery
    mux.HandleFunc("recovery:sync_flush", handlers.Recovery.HandleSyncFlush)
    mux.HandleFunc("recovery:scheduled_snapshot", handlers.Recovery.HandleScheduledSnapshot)
    mux.HandleFunc("recovery:cleanup_retention", handlers.Recovery.HandleCleanupRetention)

    // Notification
    mux.HandleFunc("notification:send", handlers.Notification.HandleSend)

    // Outbox
    mux.HandleFunc("outbox:poll", handlers.Outbox.HandlePoll)
    mux.HandleFunc("outbox:archive", handlers.Outbox.HandleArchive)

    // Middleware
    return middleware.Apply(mux, loggingMiddleware, metricsMiddleware)
}

Enqueue

Immediate

payload, _ := json.Marshal(ChargeRecurringPayload{SubscriptionID: id})
task := asynq.NewTask("billing:charge_recurring", payload)

_, err := client.Enqueue(task)

Delayed (specific time)

task := asynq.NewTask("billing:retry_charge", payload)
_, err := client.Enqueue(task, asynq.ProcessAt(time.Now().Add(24*time.Hour)))

With options

_, err := client.Enqueue(task,
    asynq.Queue("critical"),
    asynq.MaxRetry(3),
    asynq.Timeout(5*time.Minute),
    asynq.Retention(24*time.Hour),  // completed task 보관 기간
    asynq.Unique(time.Hour),         // 같은 payload 중복 enqueue 방지 (1시간)
)

Cron scheduling

// apps/worker/internal/cron/register.go
func RegisterCrons(scheduler *asynq.Scheduler, client *asynq.Client) error {
    // 정기 결제 cron — 매시 정각
    if _, err := scheduler.Register("0 * * * *", asynq.NewTask("billing:schedule_due", nil)); err != nil {
        return err
    }

    // 일일 스냅샷 — 매일 00:00 KST
    if _, err := scheduler.Register("0 15 * * *", asynq.NewTask("recovery:schedule_daily_snapshot", nil)); err != nil {
        // KST 00:00 = UTC 15:00
        return err
    }

    // 일일 retention cleanup — 매일 04:00 KST
    if _, err := scheduler.Register("0 19 * * *", asynq.NewTask("recovery:cleanup_retention", nil)); err != nil {
        return err
    }

    // 만료 세션 cleanup — 매일 03:00 KST
    if _, err := scheduler.Register("0 18 * * *", asynq.NewTask("identity:cleanup_sessions", nil)); err != nil {
        return err
    }

    // Outbox archive — 매일 05:00 KST
    if _, err := scheduler.Register("0 20 * * *", asynq.NewTask("outbox:archive", nil)); err != nil {
        return err
    }

    return nil
}

Cron handler → task enqueue

Cron task 자체는 빠르게 끝나고 실제 처리는 별도 task 로:

func HandleScheduleDueCharges(ctx context.Context, t *asynq.Task) error {
    subs, err := billingSvc.ListDueForCharge(ctx, time.Now().Add(time.Hour))
    if err != nil { return err }

    for _, s := range subs {
        delay := time.Until(s.NextBillingAt)
        payload, _ := json.Marshal(ChargeRecurringPayload{SubscriptionID: s.ID})
        task := asynq.NewTask("billing:charge_recurring", payload)
        client.Enqueue(task,
            asynq.ProcessIn(delay),
            asynq.MaxRetry(0),
            asynq.Unique(delay + time.Minute),
        )
    }
    return nil
}

이 패턴이 "cron 이 대량 작업을 직접 처리" 보다 안전.

Priority queues

asynq.Config{
    Queues: map[string]int{
        "critical": 5,  // 5/9 worker 할당
        "default":  3,
        "low":      1,
    },
}

Enqueue 시 queue 지정:

client.Enqueue(task, asynq.Queue("critical"))

사용 예:

  • critical — 결제 charge, AntiNuke 응답 관련 알림
  • default — 일반 알림, Live Sync 배치
  • low — retention cleanup, archive

Uniqueness

중복 enqueue 방지:

// 같은 task type + payload 는 1시간 안에 한 번만 enqueue
client.Enqueue(task, asynq.Unique(time.Hour))

내부적으로 Redis key (asynq:unique:...) 에 TTL 로 저장. 동일 요청이 여러 번 들어와도 하나만 처리.

Cron 의 중복 실행 방지에도 유용.

Idempotency

asynq 의 retry 특성상 handler 는 idempotent:

func (h *Handler) HandleChargeRecurring(ctx context.Context, t *asynq.Task) error {
    var payload ChargeRecurringPayload
    json.Unmarshal(t.Payload(), &payload)

    sub, err := h.svc.GetSubscription(ctx, payload.SubscriptionID)
    if err != nil { return err }

    // Idempotency check — 이미 이 cycle 이 결제됐으면 skip
    if sub.LastChargedCycle == sub.CurrentCycle {
        return nil
    }

    return h.svc.ProcessRecurringCharge(ctx, payload.SubscriptionID)
}

Or: DB 쪽에서 UNIQUE constraint (payment_attempts.order_id) 로 중복 거부.

Middleware

func loggingMiddleware(next asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        start := time.Now()
        err := next.ProcessTask(ctx, t)
        dur := time.Since(start)

        logger.Info("task processed",
            "type", t.Type(),
            "duration_ms", dur.Milliseconds(),
            "error", err,
        )
        return err
    })
}

func metricsMiddleware(next asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        tasksReceived.WithLabelValues(t.Type()).Inc()
        start := time.Now()
        err := next.ProcessTask(ctx, t)
        taskDuration.WithLabelValues(t.Type()).Observe(time.Since(start).Seconds())
        if err != nil {
            tasksFailed.WithLabelValues(t.Type()).Inc()
        }
        return err
    })
}

Dead letter handling

// asynq 자동: MaxRetry 초과 시 "archived" 상태로 이동
// 운영자가 asynqmon UI (web) 또는 CLI 로 확인/재큐

// 또는 ErrorHandler 로 외부 알림
srv := asynq.NewServer(opt, asynq.Config{
    ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, t *asynq.Task, err error) {
        logger.Error("task failed",
            "type", t.Type(),
            "error", err,
            "retry_count", asynq.GetRetryCount(ctx),
            "max_retry", asynq.GetMaxRetry(ctx),
        )
    }),
})

Critical task (결제 관련) 의 dead letter 는 Slack 알림 연계 (Phase 2).

Graceful shutdown

// Server shutdown
srv.Shutdown()

// Scheduler shutdown
scheduler.Shutdown()

현재 처리 중인 task 완료 대기 후 종료.

Fly.io 의 SIGTERM 핸들링:

func main() {
    srv := asynq.NewServer(...)
    go func() {
        if err := srv.Run(mux); err != nil { log.Fatal(err) }
    }()

    // Wait for signal
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
    <-sigs

    srv.Shutdown()
}

Observability

asynqmon

asynq 의 공식 web UI. 로컬:

go run github.com/hibiken/asynq/tools/asynq/cmd@latest stats
# 또는 asynqmon (별도 설치)
  • Pending / active / completed / failed task 카운트
  • Queue 별 통계
  • Failed task payload + 에러 조회

Metrics

OpenTelemetry 로 export:

  • Task processing duration (histogram)
  • Tasks processed (counter, by type & status)
  • Queue length (gauge)

Grafana Cloud 대시보드에서 시각화.

Logging

구조화된 필드:

  • task_type
  • task_id
  • queue
  • retry_count
  • duration_ms
  • error (실패 시)

Testing

Unit test handler

func TestBillingHandler_ChargeRecurring(t *testing.T) {
    mockSvc := &mockBillingService{chargeResult: ...}
    h := &BillingJobHandler{svc: mockSvc, logger: testLogger}

    payload, _ := json.Marshal(ChargeRecurringPayload{SubscriptionID: subID})
    task := asynq.NewTask("billing:charge_recurring", payload)

    err := h.HandleChargeRecurring(ctx, task)
    assert.NoError(t, err)
    assert.Len(t, mockSvc.chargeCalls, 1)
}

Integration test

func TestBillingWorker_Integration(t *testing.T) {
    if testing.Short() { t.Skip() }

    redisOpt := asynq.RedisClientOpt{Addr: "localhost:6379", DB: 15}
    client := asynq.NewClient(redisOpt)
    defer client.Close()

    // 실제 task enqueue
    payload, _ := json.Marshal(...)
    client.Enqueue(asynq.NewTask("billing:charge_recurring", payload))

    // 실제 server 기동 + 완료 대기
    srv := asynq.NewServer(redisOpt, ...)
    mux := setupMux(realServices)
    go srv.Run(mux)
    defer srv.Shutdown()

    // assertion (DB 상태 확인)
    time.Sleep(2 * time.Second)
    sub, _ := billingRepo.GetByID(ctx, subID)
    assert.Equal(t, "active", sub.Status)
}

Do / Don't

Do

  • ✅ Task type 에 도메인 prefix (billing:, recovery:)
  • ✅ Handler idempotent
  • ✅ Retry 가능한 / 불가능한 에러 구분
  • asynq.Unique 로 중복 방지
  • ✅ Priority queue 활용
  • ✅ Cron 은 간단히, 실제 처리는 enqueue 후 별도 task
  • ✅ 구조화된 로깅
  • ✅ Graceful shutdown

Don't

  • ❌ asynq 에 장기 workflow 구현 (Temporal 사용)
  • ❌ Non-idempotent handler
  • ❌ Handler 에서 panic (recover 있지만 원칙적 X)
  • ❌ Payload 에 sensitive data (암호화 후 ID 만 전달)
  • ❌ 무한 retry (MaxRetry 명시)

See also

  • temporal-workflow.md — 장기 workflow
  • outbox-pattern.md — 이벤트 전달
  • backend-conventions.md
  • ../adr/0007-cache-queue-redis.md
  • ../adr/0015-asynq-vs-temporal-split.md