콘텐츠로 이동

Outbox Pattern Implementation

Umbra 의 Outbox 패턴 실무 가이드. 도메인 상태 변경과 이벤트 발행을 같은 트랜잭션으로 보장하고, 별도 poller 가 이벤트를 구독자에게 dispatch 한다. At-least-once 전달과 idempotent 처리가 핵심.

Why Outbox

MVP 에서 "도메인 A 가 변하면 도메인 B 에 알려야 함" 요구가 많다 (결제 → License, License → Recovery 기능 활성 등). 단순 in-memory 호출이나 채널은 프로세스 장애 시 이벤트 손실 가능.

Outbox 는:

  • Atomicity — 도메인 변경과 이벤트 기록을 같은 DB 트랜잭션 에 담아 "반쪽 실패" 방지
  • Durability — 이벤트는 DB 에 저장되므로 프로세스 crash 에 강함
  • At-least-once — poller 가 확실히 전달하고 구독자 idempotent 처리로 중복 안전
  • Replay — 과거 이벤트 재전달 가능 (Audit 에 자동 저장)

ADR-0016 에 근거.

Architecture

Core components

  1. events.outbox 테이블 — BIGSERIAL id, aggregate_type, aggregate_id, event_type, payload (JSONB), published_at, retry_count
  2. Publisher — 도메인 서비스가 트랜잭션 안에서 INSERT
  3. Poller — Worker 가 2초 interval 로 unpublished 이벤트 조회 → dispatch
  4. Subscribers — 각 도메인의 handler (Audit, Notification, Licensing, Recovery 등)
sequenceDiagram
    participant Svc as Domain Service
    participant TX as DB TX
    participant Outbox as events.outbox
    participant Poller as Worker Poller
    participant Subs as Subscribers

    Svc->>TX: BEGIN
    Svc->>TX: UPDATE domain_table SET ...
    Svc->>TX: INSERT INTO events.outbox (...)
    Svc->>TX: COMMIT

    Note over Poller: Every 2s
    Poller->>Outbox: SELECT ... WHERE published_at IS NULL
ORDER BY id ASC LIMIT 50 loop each event Poller->>Subs: Dispatch to subscribers (Audit, Notification, ...) alt All subscribers OK Poller->>Outbox: UPDATE published_at = NOW() else Some failed Poller->>Outbox: UPDATE retry_count++, next_retry_at = ... end end

events.outbox 테이블

스키마 상세: data/events-schema.md. 요약:

CREATE TABLE events.outbox (
    id              BIGSERIAL PRIMARY KEY,
    aggregate_type  TEXT NOT NULL,
    aggregate_id    UUID NOT NULL,
    event_type      TEXT NOT NULL,
    payload         JSONB NOT NULL,
    occurred_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ,
    retry_count     INTEGER NOT NULL DEFAULT 0,
    last_error      TEXT,
    next_retry_at   TIMESTAMPTZ
);

CREATE INDEX outbox_unpublished_idx
ON events.outbox (id) WHERE published_at IS NULL;
  • BIGSERIAL — 순서 보장 (ADR-0021 예외)
  • Partial index — poller scan 최적화

Publisher (도메인에서)

Port interface

// engine/billing/port/event.go
type EventPublisher interface {
    Publish(ctx context.Context, event domain.Event) error
}

// domain.Event 은 interface
type Event interface {
    AggregateType() string
    AggregateID() uuid.UUID
    EventType() string
    Payload() (json.RawMessage, error)
    OccurredAt() time.Time
}

Concrete events

// engine/billing/domain/events.go
type PaymentSucceededEvent struct {
    SubscriptionID uuid.UUID
    AttemptID      uuid.UUID
    LicenseID      uuid.UUID
    NewPeriodEnd   time.Time
    CycleCount     int
    OccurredAtTime time.Time
}

func (e PaymentSucceededEvent) AggregateType() string { return "subscription" }
func (e PaymentSucceededEvent) AggregateID() uuid.UUID { return e.SubscriptionID }
func (e PaymentSucceededEvent) EventType() string { return "PaymentSucceeded" }
func (e PaymentSucceededEvent) Payload() (json.RawMessage, error) {
    return json.Marshal(map[string]any{
        "version": 1,
        "data": map[string]any{
            "subscription_id": e.SubscriptionID,
            "attempt_id":      e.AttemptID,
            "license_id":      e.LicenseID,
            "new_period_end":  e.NewPeriodEnd,
            "cycle_count":     e.CycleCount,
        },
    })
}
func (e PaymentSucceededEvent) OccurredAt() time.Time { return e.OccurredAtTime }

Adapter 구현

// platform/event/outbox_publisher.go
type OutboxPublisher struct {
    queries *sqlc.Queries
}

func NewOutboxPublisher(q *sqlc.Queries) port.EventPublisher {
    return &OutboxPublisher{queries: q}
}

func (p *OutboxPublisher) Publish(ctx context.Context, event domain.Event) error {
    payload, err := event.Payload()
    if err != nil { return fmt.Errorf("marshal payload: %w", err) }

    return p.queries.InsertOutboxEvent(ctx, sqlc.InsertOutboxEventParams{
        AggregateType: event.AggregateType(),
        AggregateID:   event.AggregateID(),
        EventType:     event.EventType(),
        Payload:       payload,
        OccurredAt:    event.OccurredAt(),
    })
}

사용 (트랜잭션 내)

// engine/billing/app/process_recurring.go
func (s *Service) handleChargeSuccess(ctx context.Context, sub domain.Subscription, attempt domain.PaymentAttempt, result *port.ChargeResult) error {
    return s.tx.WithTx(ctx, func(tx port.TxRepos) error {
        // Attempt 업데이트
        attempt.Status = "succeeded"
        attempt.TossPaymentKey = &result.PaymentKey
        attempt.TossApprovedAt = &result.ApprovedAt
        if err := tx.Attempts.Update(ctx, attempt); err != nil { return err }

        // Subscription advance
        sub.AdvancePeriod(newEnd, nextBilling)
        if err := tx.Subs.Update(ctx, sub); err != nil { return err }

        // 같은 트랜잭션에서 이벤트 발행
        return tx.Events.Publish(ctx, domain.PaymentSucceededEvent{
            SubscriptionID: sub.ID,
            AttemptID:      attempt.ID,
            LicenseID:      sub.LicenseID,
            NewPeriodEnd:   newEnd,
            CycleCount:     sub.CycleCount,
            OccurredAtTime: time.Now(),
        })
    })
}

tx.Events 는 같은 sqlc queries 인스턴스 (같은 트랜잭션) 를 쓰는 OutboxPublisher.

Poller

단일 인스턴스 전제

MVP 는 단일 poller (Worker 프로세스 안의 하나의 고루틴). 수평 확장 시 FOR UPDATE SKIP LOCKED 추가로 가능.

// apps/worker/internal/outbox/poller.go
type Poller struct {
    queries   *sqlc.Queries
    dispatch  Dispatcher
    interval  time.Duration
    batchSize int
    logger    *slog.Logger
}

func NewPoller(q *sqlc.Queries, d Dispatcher, logger *slog.Logger) *Poller {
    return &Poller{
        queries: q, dispatch: d, logger: logger,
        interval: 2 * time.Second, batchSize: 50,
    }
}

func (p *Poller) Run(ctx context.Context) error {
    ticker := time.NewTicker(p.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done(): return ctx.Err()
        case <-ticker.C:
            if err := p.processBatch(ctx); err != nil {
                p.logger.Error("outbox poll failed", "error", err)
            }
        }
    }
}

func (p *Poller) processBatch(ctx context.Context) error {
    events, err := p.queries.FetchUnpublishedBatch(ctx, int32(p.batchSize))
    if err != nil { return err }
    if len(events) == 0 { return nil }

    for _, evt := range events {
        p.processEvent(ctx, evt)
    }
    return nil
}

func (p *Poller) processEvent(ctx context.Context, evt sqlc.OutboxEvent) {
    if err := p.dispatch.Dispatch(ctx, evt); err != nil {
        p.logger.Error("dispatch failed", "id", evt.ID, "event_type", evt.EventType, "error", err)

        // Exponential backoff: 1m, 5m, 15m, 1h, 4h, 24h
        delay := retryDelays[evt.RetryCount]
        if evt.RetryCount >= len(retryDelays)-1 {
            // Dead letter: retry 멈추고 alert
            p.logger.Error("DEAD LETTER", "event_id", evt.ID)
            // retry_count 증가시키지 않음 (무한 반복 방지)
            return
        }

        p.queries.MarkOutboxRetry(ctx, sqlc.MarkOutboxRetryParams{
            ID:          evt.ID,
            LastError:   err.Error(),
            NextRetryAt: time.Now().Add(delay),
        })
        return
    }

    p.queries.MarkOutboxPublished(ctx, evt.ID)
}

var retryDelays = []time.Duration{
    1 * time.Minute,
    5 * time.Minute,
    15 * time.Minute,
    1 * time.Hour,
    4 * time.Hour,
    24 * time.Hour,
}

Fetch 쿼리

-- name: FetchUnpublishedBatch :many
SELECT *
FROM events.outbox
WHERE published_at IS NULL
  AND (next_retry_at IS NULL OR next_retry_at <= NOW())
ORDER BY id ASC
LIMIT $1;

ORDER BY id ASC 가 순서 보장 (BIGSERIAL).

Mark published

-- name: MarkOutboxPublished :exec
UPDATE events.outbox
SET published_at = NOW()
WHERE id = $1;

Mark retry

-- name: MarkOutboxRetry :exec
UPDATE events.outbox
SET retry_count = retry_count + 1,
    last_error = $2,
    next_retry_at = $3
WHERE id = $1;

Dispatcher

이벤트를 관련 구독자들에게 fanout. 동기 실행 (각 구독자가 빠르게 끝나야 함; 장기 작업은 asynq / Temporal 로).

// apps/worker/internal/outbox/dispatcher.go
type Dispatcher interface {
    Dispatch(ctx context.Context, evt sqlc.OutboxEvent) error
}

type dispatcher struct {
    audit         audit.Consumer
    notification  notification.Consumer
    licensing     licensing.Consumer
    recovery      recovery.Consumer
    logger        *slog.Logger
}

func (d *dispatcher) Dispatch(ctx context.Context, evt sqlc.OutboxEvent) error {
    subscribers := d.resolveSubscribers(evt.EventType)

    for _, sub := range subscribers {
        if err := sub.Handle(ctx, evt); err != nil {
            return fmt.Errorf("%s failed: %w", sub.Name(), err)
        }
    }
    return nil
}

func (d *dispatcher) resolveSubscribers(eventType string) []Subscriber {
    // Audit 은 모든 이벤트 구독
    result := []Subscriber{d.audit}

    switch eventType {
    case "PaymentSucceeded":
        result = append(result, d.licensing, d.notification)
    case "PaymentFailedFinal":
        result = append(result, d.licensing, d.notification, d.recovery)
    case "SubscriptionStarted":
        result = append(result, d.licensing, d.notification)
    case "LicenseUpgraded":
        result = append(result, d.recovery, d.notification)
    case "BotInstalled":
        result = append(result, d.licensing)
    case "BotKicked":
        result = append(result, d.licensing, d.recovery)
    // ... 매핑
    }
    return result
}

구독자 매핑은 중앙화. 새 이벤트 추가 시 여기 수정.

Subscribers (consumer)

Audit (모든 이벤트)

// engine/audit/consumer.go
type Consumer struct {
    svc Service
}

func (c *Consumer) Handle(ctx context.Context, evt sqlc.OutboxEvent) error {
    auditEvent := AuditEvent{
        ID:             uuid.NewV7(),
        OutboxEventID:  evt.ID,
        AggregateType:  evt.AggregateType,
        AggregateID:    evt.AggregateID,
        EventType:      evt.EventType,
        Payload:        evt.Payload,
        OccurredAt:     evt.OccurredAt,
        // ...
    }
    return c.svc.RecordEvent(ctx, auditEvent)  // UNIQUE (outbox_event_id) 로 중복 거부
}

func (c *Consumer) Name() string { return "audit" }

Licensing

// engine/licensing/consumer.go
func (c *Consumer) Handle(ctx context.Context, evt sqlc.OutboxEvent) error {
    switch evt.EventType {
    case "SubscriptionStarted":
        return c.onSubscriptionStarted(ctx, evt)
    case "PaymentSucceeded":
        return c.onPaymentSucceeded(ctx, evt)
    case "PaymentFailedFinal":
        return c.onPaymentFailedFinal(ctx, evt)
    case "BotInstalled":
        return c.onBotInstalled(ctx, evt)
    case "BotKicked":
        return c.onBotKicked(ctx, evt)
    }
    return nil  // 관심 없는 이벤트는 skip
}

func (c *Consumer) onPaymentSucceeded(ctx context.Context, evt sqlc.OutboxEvent) error {
    var payload PaymentSucceededPayload
    if err := unmarshalPayload(evt.Payload, &payload); err != nil { return err }

    // Idempotent: GREATEST 로 연장
    return c.svc.ExtendExpiresAt(ctx, payload.LicenseID, payload.NewPeriodEnd)
}

Idempotency 원칙

구독자는 at-least-once 전제 로 작성. 같은 이벤트가 두 번 와도 결과 동일해야 함.

방법:

  • DB UNIQUE 제약 — Audit 의 outbox_event_id UNIQUE
  • 조건부 UPDATEGREATEST(expires_at, $1), WHERE status != 'canceled'
  • 상태 체크 — "이미 처리된 상태인지" 확인 후 skip

Event payload versioning

{
  "version": 1,
  "data": { ... },
  "metadata": {
    "causation_id": "uuid",
    "correlation_id": "uuid"
  }
}

Version 관리

  • version 은 payload schema 버전
  • 필드 추가는 backwards-compatible (old subscriber 는 모르는 필드 무시)
  • 필드 삭제, 이름 변경은 major bump (새 event type 또는 version++)

Deserialize

type PaymentSucceededPayload struct {
    Version int `json:"version"`
    Data struct {
        SubscriptionID uuid.UUID `json:"subscription_id"`
        LicenseID      uuid.UUID `json:"license_id"`
        NewPeriodEnd   time.Time `json:"new_period_end"`
    } `json:"data"`
}

func unmarshalPayload(raw json.RawMessage, out any) error {
    if err := json.Unmarshal(raw, out); err != nil { return err }
    if versioned, ok := out.(interface{ GetVersion() int }); ok {
        if versioned.GetVersion() != 1 {
            return fmt.Errorf("unsupported payload version: %d", versioned.GetVersion())
        }
    }
    return nil
}

Retention & archive

처리 완료 이벤트 삭제

Daily cron:

-- name: ArchiveOldOutboxEvents :execrows
DELETE FROM events.outbox
WHERE published_at IS NOT NULL
  AND published_at < NOW() - INTERVAL '30 days';
  • Audit 가 이미 영구 보관하므로 outbox 삭제 안전
  • 30일 버퍼는 혹시 모를 재처리 필요 대비

Dead letter

retry_count >= len(retryDelays) 된 이벤트는 삭제 금지:

  • 운영자가 수동 개입 (데이터 복구 + 재처리)
  • 근본 원인 파악 후 다시 처리
-- 운영자 수동 재처리
UPDATE events.outbox
SET retry_count = 0,
    next_retry_at = NULL,
    last_error = NULL
WHERE id IN ($1, $2, ...);

Dispatcher 실패 시

한 구독자 실패 → 전체 재시도

현재 구현은 dispatcher 가 모든 구독자 순차 호출. 하나라도 실패하면 이벤트 전체 retry.

단점: 한 구독자 문제가 다른 구독자에게도 중복 전달됨.

해결: 구독자 idempotent. 같은 이벤트 여러 번 받아도 안전.

대안 (Phase 2): per-subscriber delivery state 추적. 테이블 추가 또는 payload 에 subscriber 배열.

Transient vs permanent error

  • Transient (DB 순간 장애, 외부 API 503): retry 후 성공
  • Permanent (payload parse 실패, business 규칙 위반): retry 무의미

현재 단순 구현: 모두 retry. Phase 2 에서 error type 구분.

Ordering guarantees

전체 순서

Poller 가 단일 고루틴 + ORDER BY id ASC전체 순서 엄격 유지.

Aggregate 내 순서

같은 aggregate 의 이벤트는 시간순 INSERT 되므로 자연히 순서 유지.

수평 확장 시 주의

Poller 여러 개 띄우면 순서 깨짐. Aggregate-level 순서 필요하면:

  • Sharding by aggregate_id hash
  • 또는 FOR UPDATE SKIP LOCKED + per-aggregate lock

MVP 단계에서는 단일 poller 로 충분.

Monitoring

Metrics

  • outbox_events_pending_count (gauge) — unpublished 건수
  • outbox_events_dispatched_total{event_type, status} (counter)
  • outbox_events_dispatch_duration_seconds (histogram)
  • outbox_events_dead_letter_count (gauge) — retry 무한 반복 중 건수

Dashboard 에서 불러 증가 감지 시 alert.

Logs

  • Batch 처리 시작/종료
  • 실패 이벤트 상세 (event_id, event_type, subscriber, error)
  • Dead letter alert

Testing

Publisher unit test

func TestBillingService_PublishesEvent(t *testing.T) {
    mockEvents := &mockEventPublisher{}
    svc := app.NewService(..., mockEvents, ...)

    svc.handleChargeSuccess(ctx, sub, attempt, result)

    assert.Len(t, mockEvents.published, 1)
    evt := mockEvents.published[0].(domain.PaymentSucceededEvent)
    assert.Equal(t, sub.ID, evt.SubscriptionID)
}

Consumer unit test

func TestLicensingConsumer_OnPaymentSucceeded(t *testing.T) {
    mockSvc := &mockLicensingService{}
    c := &Consumer{svc: mockSvc}

    payload := `{"version":1,"data":{"license_id":"...","new_period_end":"..."}}`
    evt := sqlc.OutboxEvent{
        EventType: "PaymentSucceeded",
        Payload:   json.RawMessage(payload),
    }

    err := c.Handle(ctx, evt)
    assert.NoError(t, err)
    assert.Len(t, mockSvc.extendCalls, 1)
}

Integration test

실제 DB 트랜잭션으로 publish → poll → dispatch 검증.

Do / Don't

Do

  • ✅ 도메인 상태 변경과 이벤트 INSERT 를 같은 트랜잭션에
  • ✅ 구독자 idempotent 설계
  • ✅ Event payload 에 version 필드
  • ✅ Dead letter 는 삭제 금지 (수동 개입)
  • ✅ Dispatcher 에서 구독자 명시적 매핑
  • ✅ Payload 에 aggregate_id 포함
  • ✅ 30일 후 published 이벤트 archive

Don't

  • ❌ 외부 API 호출을 같은 트랜잭션에 (외부 응답 기다리며 lock)
  • ❌ Payload 에 민감 정보 (PII, secret)
  • ❌ Non-idempotent 구독자
  • ❌ Version 없는 payload
  • ❌ Poller 병렬 실행 (순서 깨짐)
  • ❌ Dead letter 자동 삭제

See also

  • temporal-workflow.md
  • asynq-jobs.md
  • backend-conventions.md
  • database-conventions.md
  • ../data/events-schema.md
  • ../architecture/event-flow.md
  • ../adr/0016-outbox-pattern.md