Outbox Pattern Implementation¶
Umbra 의 Outbox 패턴 실무 가이드. 도메인 상태 변경과 이벤트 발행을 같은 트랜잭션으로 보장하고, 별도 poller 가 이벤트를 구독자에게 dispatch 한다. At-least-once 전달과 idempotent 처리가 핵심.
Why Outbox¶
MVP 에서 "도메인 A 가 변하면 도메인 B 에 알려야 함" 요구가 많다 (결제 → License, License → Recovery 기능 활성 등). 단순 in-memory 호출이나 채널은 프로세스 장애 시 이벤트 손실 가능.
Outbox 는:
- Atomicity — 도메인 변경과 이벤트 기록을 같은 DB 트랜잭션 에 담아 "반쪽 실패" 방지
- Durability — 이벤트는 DB 에 저장되므로 프로세스 crash 에 강함
- At-least-once — poller 가 확실히 전달하고 구독자 idempotent 처리로 중복 안전
- Replay — 과거 이벤트 재전달 가능 (Audit 에 자동 저장)
ADR-0016 에 근거.
Architecture¶
Core components¶
events.outbox테이블 — BIGSERIAL id, aggregate_type, aggregate_id, event_type, payload (JSONB), published_at, retry_count- Publisher — 도메인 서비스가 트랜잭션 안에서 INSERT
- Poller — Worker 가 2초 interval 로 unpublished 이벤트 조회 → dispatch
- Subscribers — 각 도메인의 handler (Audit, Notification, Licensing, Recovery 등)
sequenceDiagram
participant Svc as Domain Service
participant TX as DB TX
participant Outbox as events.outbox
participant Poller as Worker Poller
participant Subs as Subscribers
Svc->>TX: BEGIN
Svc->>TX: UPDATE domain_table SET ...
Svc->>TX: INSERT INTO events.outbox (...)
Svc->>TX: COMMIT
Note over Poller: Every 2s
Poller->>Outbox: SELECT ... WHERE published_at IS NULL
ORDER BY id ASC LIMIT 50
loop each event
Poller->>Subs: Dispatch to subscribers (Audit, Notification, ...)
alt All subscribers OK
Poller->>Outbox: UPDATE published_at = NOW()
else Some failed
Poller->>Outbox: UPDATE retry_count++, next_retry_at = ...
end
end
events.outbox 테이블¶
스키마 상세: data/events-schema.md. 요약:
CREATE TABLE events.outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id UUID NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
next_retry_at TIMESTAMPTZ
);
CREATE INDEX outbox_unpublished_idx
ON events.outbox (id) WHERE published_at IS NULL;
- BIGSERIAL — 순서 보장 (ADR-0021 예외)
- Partial index — poller scan 최적화
Publisher (도메인에서)¶
Port interface¶
// engine/billing/port/event.go
type EventPublisher interface {
Publish(ctx context.Context, event domain.Event) error
}
// domain.Event 은 interface
type Event interface {
AggregateType() string
AggregateID() uuid.UUID
EventType() string
Payload() (json.RawMessage, error)
OccurredAt() time.Time
}
Concrete events¶
// engine/billing/domain/events.go
type PaymentSucceededEvent struct {
SubscriptionID uuid.UUID
AttemptID uuid.UUID
LicenseID uuid.UUID
NewPeriodEnd time.Time
CycleCount int
OccurredAtTime time.Time
}
func (e PaymentSucceededEvent) AggregateType() string { return "subscription" }
func (e PaymentSucceededEvent) AggregateID() uuid.UUID { return e.SubscriptionID }
func (e PaymentSucceededEvent) EventType() string { return "PaymentSucceeded" }
func (e PaymentSucceededEvent) Payload() (json.RawMessage, error) {
return json.Marshal(map[string]any{
"version": 1,
"data": map[string]any{
"subscription_id": e.SubscriptionID,
"attempt_id": e.AttemptID,
"license_id": e.LicenseID,
"new_period_end": e.NewPeriodEnd,
"cycle_count": e.CycleCount,
},
})
}
func (e PaymentSucceededEvent) OccurredAt() time.Time { return e.OccurredAtTime }
Adapter 구현¶
// platform/event/outbox_publisher.go
type OutboxPublisher struct {
queries *sqlc.Queries
}
func NewOutboxPublisher(q *sqlc.Queries) port.EventPublisher {
return &OutboxPublisher{queries: q}
}
func (p *OutboxPublisher) Publish(ctx context.Context, event domain.Event) error {
payload, err := event.Payload()
if err != nil { return fmt.Errorf("marshal payload: %w", err) }
return p.queries.InsertOutboxEvent(ctx, sqlc.InsertOutboxEventParams{
AggregateType: event.AggregateType(),
AggregateID: event.AggregateID(),
EventType: event.EventType(),
Payload: payload,
OccurredAt: event.OccurredAt(),
})
}
사용 (트랜잭션 내)¶
// engine/billing/app/process_recurring.go
func (s *Service) handleChargeSuccess(ctx context.Context, sub domain.Subscription, attempt domain.PaymentAttempt, result *port.ChargeResult) error {
return s.tx.WithTx(ctx, func(tx port.TxRepos) error {
// Attempt 업데이트
attempt.Status = "succeeded"
attempt.TossPaymentKey = &result.PaymentKey
attempt.TossApprovedAt = &result.ApprovedAt
if err := tx.Attempts.Update(ctx, attempt); err != nil { return err }
// Subscription advance
sub.AdvancePeriod(newEnd, nextBilling)
if err := tx.Subs.Update(ctx, sub); err != nil { return err }
// 같은 트랜잭션에서 이벤트 발행
return tx.Events.Publish(ctx, domain.PaymentSucceededEvent{
SubscriptionID: sub.ID,
AttemptID: attempt.ID,
LicenseID: sub.LicenseID,
NewPeriodEnd: newEnd,
CycleCount: sub.CycleCount,
OccurredAtTime: time.Now(),
})
})
}
tx.Events 는 같은 sqlc queries 인스턴스 (같은 트랜잭션) 를 쓰는 OutboxPublisher.
Poller¶
단일 인스턴스 전제¶
MVP 는 단일 poller (Worker 프로세스 안의 하나의 고루틴). 수평 확장 시 FOR UPDATE SKIP LOCKED 추가로 가능.
// apps/worker/internal/outbox/poller.go
type Poller struct {
queries *sqlc.Queries
dispatch Dispatcher
interval time.Duration
batchSize int
logger *slog.Logger
}
func NewPoller(q *sqlc.Queries, d Dispatcher, logger *slog.Logger) *Poller {
return &Poller{
queries: q, dispatch: d, logger: logger,
interval: 2 * time.Second, batchSize: 50,
}
}
func (p *Poller) Run(ctx context.Context) error {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done(): return ctx.Err()
case <-ticker.C:
if err := p.processBatch(ctx); err != nil {
p.logger.Error("outbox poll failed", "error", err)
}
}
}
}
func (p *Poller) processBatch(ctx context.Context) error {
events, err := p.queries.FetchUnpublishedBatch(ctx, int32(p.batchSize))
if err != nil { return err }
if len(events) == 0 { return nil }
for _, evt := range events {
p.processEvent(ctx, evt)
}
return nil
}
func (p *Poller) processEvent(ctx context.Context, evt sqlc.OutboxEvent) {
if err := p.dispatch.Dispatch(ctx, evt); err != nil {
p.logger.Error("dispatch failed", "id", evt.ID, "event_type", evt.EventType, "error", err)
// Exponential backoff: 1m, 5m, 15m, 1h, 4h, 24h
delay := retryDelays[evt.RetryCount]
if evt.RetryCount >= len(retryDelays)-1 {
// Dead letter: retry 멈추고 alert
p.logger.Error("DEAD LETTER", "event_id", evt.ID)
// retry_count 증가시키지 않음 (무한 반복 방지)
return
}
p.queries.MarkOutboxRetry(ctx, sqlc.MarkOutboxRetryParams{
ID: evt.ID,
LastError: err.Error(),
NextRetryAt: time.Now().Add(delay),
})
return
}
p.queries.MarkOutboxPublished(ctx, evt.ID)
}
var retryDelays = []time.Duration{
1 * time.Minute,
5 * time.Minute,
15 * time.Minute,
1 * time.Hour,
4 * time.Hour,
24 * time.Hour,
}
Fetch 쿼리¶
-- name: FetchUnpublishedBatch :many
SELECT *
FROM events.outbox
WHERE published_at IS NULL
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
ORDER BY id ASC
LIMIT $1;
ORDER BY id ASC 가 순서 보장 (BIGSERIAL).
Mark published¶
Mark retry¶
-- name: MarkOutboxRetry :exec
UPDATE events.outbox
SET retry_count = retry_count + 1,
last_error = $2,
next_retry_at = $3
WHERE id = $1;
Dispatcher¶
이벤트를 관련 구독자들에게 fanout. 동기 실행 (각 구독자가 빠르게 끝나야 함; 장기 작업은 asynq / Temporal 로).
// apps/worker/internal/outbox/dispatcher.go
type Dispatcher interface {
Dispatch(ctx context.Context, evt sqlc.OutboxEvent) error
}
type dispatcher struct {
audit audit.Consumer
notification notification.Consumer
licensing licensing.Consumer
recovery recovery.Consumer
logger *slog.Logger
}
func (d *dispatcher) Dispatch(ctx context.Context, evt sqlc.OutboxEvent) error {
subscribers := d.resolveSubscribers(evt.EventType)
for _, sub := range subscribers {
if err := sub.Handle(ctx, evt); err != nil {
return fmt.Errorf("%s failed: %w", sub.Name(), err)
}
}
return nil
}
func (d *dispatcher) resolveSubscribers(eventType string) []Subscriber {
// Audit 은 모든 이벤트 구독
result := []Subscriber{d.audit}
switch eventType {
case "PaymentSucceeded":
result = append(result, d.licensing, d.notification)
case "PaymentFailedFinal":
result = append(result, d.licensing, d.notification, d.recovery)
case "SubscriptionStarted":
result = append(result, d.licensing, d.notification)
case "LicenseUpgraded":
result = append(result, d.recovery, d.notification)
case "BotInstalled":
result = append(result, d.licensing)
case "BotKicked":
result = append(result, d.licensing, d.recovery)
// ... 매핑
}
return result
}
구독자 매핑은 중앙화. 새 이벤트 추가 시 여기 수정.
Subscribers (consumer)¶
Audit (모든 이벤트)¶
// engine/audit/consumer.go
type Consumer struct {
svc Service
}
func (c *Consumer) Handle(ctx context.Context, evt sqlc.OutboxEvent) error {
auditEvent := AuditEvent{
ID: uuid.NewV7(),
OutboxEventID: evt.ID,
AggregateType: evt.AggregateType,
AggregateID: evt.AggregateID,
EventType: evt.EventType,
Payload: evt.Payload,
OccurredAt: evt.OccurredAt,
// ...
}
return c.svc.RecordEvent(ctx, auditEvent) // UNIQUE (outbox_event_id) 로 중복 거부
}
func (c *Consumer) Name() string { return "audit" }
Licensing¶
// engine/licensing/consumer.go
func (c *Consumer) Handle(ctx context.Context, evt sqlc.OutboxEvent) error {
switch evt.EventType {
case "SubscriptionStarted":
return c.onSubscriptionStarted(ctx, evt)
case "PaymentSucceeded":
return c.onPaymentSucceeded(ctx, evt)
case "PaymentFailedFinal":
return c.onPaymentFailedFinal(ctx, evt)
case "BotInstalled":
return c.onBotInstalled(ctx, evt)
case "BotKicked":
return c.onBotKicked(ctx, evt)
}
return nil // 관심 없는 이벤트는 skip
}
func (c *Consumer) onPaymentSucceeded(ctx context.Context, evt sqlc.OutboxEvent) error {
var payload PaymentSucceededPayload
if err := unmarshalPayload(evt.Payload, &payload); err != nil { return err }
// Idempotent: GREATEST 로 연장
return c.svc.ExtendExpiresAt(ctx, payload.LicenseID, payload.NewPeriodEnd)
}
Idempotency 원칙¶
구독자는 at-least-once 전제 로 작성. 같은 이벤트가 두 번 와도 결과 동일해야 함.
방법:
- DB UNIQUE 제약 — Audit 의
outbox_event_id UNIQUE - 조건부 UPDATE —
GREATEST(expires_at, $1),WHERE status != 'canceled' - 상태 체크 — "이미 처리된 상태인지" 확인 후 skip
Event payload versioning¶
Version 관리¶
version은 payload schema 버전- 필드 추가는 backwards-compatible (old subscriber 는 모르는 필드 무시)
- 필드 삭제, 이름 변경은 major bump (새 event type 또는 version++)
Deserialize¶
type PaymentSucceededPayload struct {
Version int `json:"version"`
Data struct {
SubscriptionID uuid.UUID `json:"subscription_id"`
LicenseID uuid.UUID `json:"license_id"`
NewPeriodEnd time.Time `json:"new_period_end"`
} `json:"data"`
}
func unmarshalPayload(raw json.RawMessage, out any) error {
if err := json.Unmarshal(raw, out); err != nil { return err }
if versioned, ok := out.(interface{ GetVersion() int }); ok {
if versioned.GetVersion() != 1 {
return fmt.Errorf("unsupported payload version: %d", versioned.GetVersion())
}
}
return nil
}
Retention & archive¶
처리 완료 이벤트 삭제¶
Daily cron:
-- name: ArchiveOldOutboxEvents :execrows
DELETE FROM events.outbox
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '30 days';
- Audit 가 이미 영구 보관하므로 outbox 삭제 안전
- 30일 버퍼는 혹시 모를 재처리 필요 대비
Dead letter¶
retry_count >= len(retryDelays) 된 이벤트는 삭제 금지:
- 운영자가 수동 개입 (데이터 복구 + 재처리)
- 근본 원인 파악 후 다시 처리
-- 운영자 수동 재처리
UPDATE events.outbox
SET retry_count = 0,
next_retry_at = NULL,
last_error = NULL
WHERE id IN ($1, $2, ...);
Dispatcher 실패 시¶
한 구독자 실패 → 전체 재시도¶
현재 구현은 dispatcher 가 모든 구독자 순차 호출. 하나라도 실패하면 이벤트 전체 retry.
단점: 한 구독자 문제가 다른 구독자에게도 중복 전달됨.
해결: 구독자 idempotent. 같은 이벤트 여러 번 받아도 안전.
대안 (Phase 2): per-subscriber delivery state 추적. 테이블 추가 또는 payload 에 subscriber 배열.
Transient vs permanent error¶
- Transient (DB 순간 장애, 외부 API 503): retry 후 성공
- Permanent (payload parse 실패, business 규칙 위반): retry 무의미
현재 단순 구현: 모두 retry. Phase 2 에서 error type 구분.
Ordering guarantees¶
전체 순서¶
Poller 가 단일 고루틴 + ORDER BY id ASC 라 전체 순서 엄격 유지.
Aggregate 내 순서¶
같은 aggregate 의 이벤트는 시간순 INSERT 되므로 자연히 순서 유지.
수평 확장 시 주의¶
Poller 여러 개 띄우면 순서 깨짐. Aggregate-level 순서 필요하면:
- Sharding by
aggregate_idhash - 또는
FOR UPDATE SKIP LOCKED+ per-aggregate lock
MVP 단계에서는 단일 poller 로 충분.
Monitoring¶
Metrics¶
outbox_events_pending_count(gauge) — unpublished 건수outbox_events_dispatched_total{event_type, status}(counter)outbox_events_dispatch_duration_seconds(histogram)outbox_events_dead_letter_count(gauge) — retry 무한 반복 중 건수
Dashboard 에서 불러 증가 감지 시 alert.
Logs¶
- Batch 처리 시작/종료
- 실패 이벤트 상세 (event_id, event_type, subscriber, error)
- Dead letter alert
Testing¶
Publisher unit test¶
func TestBillingService_PublishesEvent(t *testing.T) {
mockEvents := &mockEventPublisher{}
svc := app.NewService(..., mockEvents, ...)
svc.handleChargeSuccess(ctx, sub, attempt, result)
assert.Len(t, mockEvents.published, 1)
evt := mockEvents.published[0].(domain.PaymentSucceededEvent)
assert.Equal(t, sub.ID, evt.SubscriptionID)
}
Consumer unit test¶
func TestLicensingConsumer_OnPaymentSucceeded(t *testing.T) {
mockSvc := &mockLicensingService{}
c := &Consumer{svc: mockSvc}
payload := `{"version":1,"data":{"license_id":"...","new_period_end":"..."}}`
evt := sqlc.OutboxEvent{
EventType: "PaymentSucceeded",
Payload: json.RawMessage(payload),
}
err := c.Handle(ctx, evt)
assert.NoError(t, err)
assert.Len(t, mockSvc.extendCalls, 1)
}
Integration test¶
실제 DB 트랜잭션으로 publish → poll → dispatch 검증.
Do / Don't¶
Do¶
- ✅ 도메인 상태 변경과 이벤트 INSERT 를 같은 트랜잭션에
- ✅ 구독자 idempotent 설계
- ✅ Event payload 에 version 필드
- ✅ Dead letter 는 삭제 금지 (수동 개입)
- ✅ Dispatcher 에서 구독자 명시적 매핑
- ✅ Payload 에 aggregate_id 포함
- ✅ 30일 후 published 이벤트 archive
Don't¶
- ❌ 외부 API 호출을 같은 트랜잭션에 (외부 응답 기다리며 lock)
- ❌ Payload 에 민감 정보 (PII, secret)
- ❌ Non-idempotent 구독자
- ❌ Version 없는 payload
- ❌ Poller 병렬 실행 (순서 깨짐)
- ❌ Dead letter 자동 삭제
See also¶
temporal-workflow.mdasynq-jobs.mdbackend-conventions.mddatabase-conventions.md../data/events-schema.md../architecture/event-flow.md../adr/0016-outbox-pattern.md