콘텐츠로 이동

Temporal Workflow Guide

Umbra 가 Temporal 을 사용하는 방식과 실무 패턴. Recovery domain 의 restore / antinuke workflow 작성 가이드를 중심으로 한다. asynq 와 대비되는 사용 기준은 asynq-jobs.md 참조.

Why Temporal

Recovery 는 장기 실행 + 재시작 안전성 + 단계별 재시도가 필요하다. 자체 구현하면 상태 머신과 retry 로직을 수동으로 만들어야 한다. Temporal 은 이를 프레임워크로 제공 (ADR-0014).

핵심 보장:

  • Durable execution — workflow 프로세스가 죽어도 상태 유지, 재시작 후 이어서 실행
  • Automatic retry — Activity 는 configurable backoff 로 자동 재시도
  • Visibility — Temporal Web UI 로 실시간 진행 조회
  • Idempotency guidance — Workflow/Activity 설계가 자연스럽게 idempotent 해짐

When to use

Temporal 적합

  • 복구 (Recovery) — 수십 초 ~ 수 분, 단계별 재시도 필요
  • AntiNuke — 길드당 장기 running 감지기
  • Phase 2+: 장기 사용자 journey (예: 7일 trial → 결제 유도)

Temporal 부적합 (asynq 사용)

  • 짧은 백그라운드 작업 (수 초)
  • cron 기반 정기 실행 (일일 스냅샷, 결제 cron)
  • Outbox poller, Notification 발송
  • Live Sync 배치

상세 비교는 asynq-jobs.md 와 ADR-0015.

Architecture

컴포넌트

  • Temporal Server — Neon Postgres 를 backing store 로 self-host (Phase 2 에서 Temporal Cloud 검토)
  • Workerapps/worker 가 workflow 와 activity 를 실행
  • Clientapps/api 와 도메인 서비스가 workflow 시작/신호 전송

Namespace

umbra-local, umbra-prod 등 환경별 namespace 분리.

Task queues

  • umbra-recovery-restore — Restore workflow
  • umbra-recovery-antinuke — AntiNuke workflow

향후 도메인 확장 시 추가.

Workflow 작성 규칙

Determinism (가장 중요)

Workflow 는 결정적 (deterministic) 이어야 한다. Temporal 이 event history 를 replay 하여 상태를 복원하므로, 같은 history 로 실행 시 항상 같은 결과가 나와야 함.

Workflow 에서 금지:

// ❌ Non-deterministic
time.Now()              // 실제 시각은 replay 시 다름
rand.Intn(10)           // 랜덤 값
http.Get(...)           // 외부 IO
os.ReadFile(...)        // 파일 IO
uuid.NewV7()            // UUID 생성 (시간 기반)
go func() { ... }()     // 병렬 goroutine
time.Sleep(d)           // 일반 sleep
channel send/receive  select timeout  사용

Workflow 에서 허용:

workflow.Now(ctx)                           // deterministic time
workflow.NewTimer(ctx, duration)            // deterministic sleep
workflow.SideEffect(ctx, func() any {...})  // non-det 값 한 번만 계산 후 replay
workflow.ExecuteActivity(ctx, fn, args...)  // 외부 IO 는 Activity 로
workflow.NewSelector(ctx)                   // event-driven 흐름
workflow.ExecuteChildWorkflow(ctx, ...)     // 하위 workflow

Activity 로 분리

외부 IO, 시간 의존, random 은 모두 Activity 로:

// Workflow
func RestoreWorkflow(ctx workflow.Context, input RestoreInput) error {
    var snap SnapshotData
    err := workflow.ExecuteActivity(ctx, ValidateSnapshotActivity, input.SnapshotID).Get(ctx, &snap)
    if err != nil { return err }
    ...
}

// Activity (일반 Go 함수, IO 허용)
func ValidateSnapshotActivity(ctx context.Context, snapshotID uuid.UUID) (SnapshotData, error) {
    snap, err := repo.GetByID(ctx, snapshotID)
    if err != nil { return SnapshotData{}, err }
    if !snap.IsValid() { return SnapshotData{}, ErrInvalidSnapshot }
    return snap.ToActivityResult(), nil
}

Retry policy

ao := workflow.ActivityOptions{
    StartToCloseTimeout: 5 * time.Minute,
    HeartbeatTimeout:    30 * time.Second,
    RetryPolicy: &temporal.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval:    time.Minute,
        MaximumAttempts:    10,
        NonRetryableErrorTypes: []string{
            "PermissionDenied",
            "GuildNotFound",
            "InvalidSnapshot",
        },
    },
}
ctx = workflow.WithActivityOptions(ctx, ao)
  • NonRetryableErrorTypes — business logic 오류는 재시도 무의미
  • StartToCloseTimeout — 단일 실행 타임아웃
  • HeartbeatTimeout — 장기 실행 Activity 가 살아있음을 증명

Heartbeat

장기 실행 Activity 는 주기적으로 heartbeat:

func LongActivity(ctx context.Context, input Input) error {
    for _, item := range input.Items {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        processItem(item)
        activity.RecordHeartbeat(ctx, item.ID)  // 진행 상태 전달
    }
    return nil
}

Heartbeat 없이 오래 걸리면 Temporal 이 timeout 판정 → retry.

패턴: Signal 기반 AntiNuke workflow

AntiNuke 는 길드당 장기 running workflow 가 signal 수신하며 rolling window 관리:

func AntiNukeWorkflow(ctx workflow.Context, guildID uuid.UUID) error {
    logger := workflow.GetLogger(ctx)
    windows := newRollingWindows(defaultThresholds())
    antinukeEnabled := true

    // Query handler (외부에서 현재 상태 조회)
    workflow.SetQueryHandler(ctx, "getWindowSizes", func() (map[string]int, error) {
        result := map[string]int{}
        for p, w := range windows {
            result[string(p)] = w.Len()
        }
        return result, nil
    })

    for {
        selector := workflow.NewSelector(ctx)

        // Event signal
        selector.AddReceive(
            workflow.GetSignalChannel(ctx, "event"),
            func(c workflow.ReceiveChannel, more bool) {
                var ev EventSignal
                c.Receive(ctx, &ev)

                if !antinukeEnabled { return }

                window := windows[ev.Pattern]
                window.Push(ev)
                window.EvictOld(workflow.Now(ctx))

                if window.Len() >= window.Threshold() {
                    respondToDetection(ctx, guildID, ev.Pattern, window)
                    window.Clear()
                }
            },
        )

        // Config update signal
        selector.AddReceive(
            workflow.GetSignalChannel(ctx, "config_update"),
            func(c workflow.ReceiveChannel, more bool) {
                var cfg ConfigUpdate
                c.Receive(ctx, &cfg)
                antinukeEnabled = cfg.Enabled
                for pattern, threshold := range cfg.Thresholds {
                    windows[pattern].UpdateThreshold(threshold)
                }
            },
        )

        // Shutdown signal
        selector.AddReceive(
            workflow.GetSignalChannel(ctx, "shutdown"),
            func(c workflow.ReceiveChannel, more bool) {
                var _ any
                c.Receive(ctx, &_)
                logger.Info("antinuke workflow shutting down", "guild_id", guildID)
            },
        )

        // Periodic evict timer
        selector.AddFuture(
            workflow.NewTimer(ctx, time.Minute),
            func(f workflow.Future) {
                now := workflow.Now(ctx)
                for _, w := range windows {
                    w.EvictOld(now)
                }
            },
        )

        selector.Select(ctx)
    }
}

외부에서 signal 전송:

// Go client
err := temporalClient.SignalWorkflow(ctx, workflowID, "", "event", EventSignal{
    Pattern:   MassRoleDelete,
    Timestamp: time.Now(),
    AggregateID: roleID,
})

패턴: Sequential workflow

Restore 처럼 단계별로 실행되는 workflow:

func RestoreWorkflow(ctx workflow.Context, input RestoreInput) (*RestoreResult, error) {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 5 * time.Minute,
        RetryPolicy: restoreRetryPolicy,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // 1. Validate
    var snap SnapshotData
    if err := workflow.ExecuteActivity(ctx, ValidateSnapshotActivity, input.SnapshotID).Get(ctx, &snap); err != nil {
        return nil, err
    }

    // 2. Optional pre-restore snapshot
    if input.Options.CreatePreRestoreSnapshot {
        var preID uuid.UUID
        if err := workflow.ExecuteActivity(ctx, CreatePreRestoreSnapshotActivity, input.GuildID).Get(ctx, &preID); err != nil {
            workflow.GetLogger(ctx).Warn("pre-restore snapshot failed", "error", err)
        }
    }

    // 3. Pause Live Sync with defer resume
    if err := workflow.ExecuteActivity(ctx, PauseLiveSyncActivity, input.GuildID).Get(ctx, nil); err != nil {
        return nil, err
    }
    defer func() {
        // Disconnected context 로 cancellation 에도 실행
        disConnCtx, _ := workflow.NewDisconnectedContext(ctx)
        _ = workflow.ExecuteActivity(disConnCtx, ResumeLiveSyncActivity, input.GuildID, true).Get(disConnCtx, nil)
    }()

    // 4. Delete phase (역순)
    if contains(input.Scope, "permission_overrides") {
        workflow.ExecuteActivity(ctx, DeleteRemovedOverridesActivity, input).Get(ctx, nil)
    }
    if contains(input.Scope, "channels") {
        workflow.ExecuteActivity(ctx, DeleteRemovedChannelsActivity, input).Get(ctx, nil)
    }
    if contains(input.Scope, "roles") {
        workflow.ExecuteActivity(ctx, DeleteRemovedRolesActivity, input).Get(ctx, nil)
    }

    // 5. Create/Update phase (정순)
    if contains(input.Scope, "roles") {
        workflow.ExecuteActivity(ctx, CreateOrUpdateRolesActivity, input).Get(ctx, nil)
    }
    if contains(input.Scope, "channels") {
        workflow.ExecuteActivity(ctx, CreateOrUpdateChannelsActivity, input).Get(ctx, nil)
    }
    if contains(input.Scope, "permission_overrides") {
        workflow.ExecuteActivity(ctx, ApplyPermissionOverridesActivity, input).Get(ctx, nil)
    }
    if contains(input.Scope, "guild_settings") {
        workflow.ExecuteActivity(ctx, UpdateGuildSettingsActivity, input).Get(ctx, nil)
    }

    // 6. Finalize
    result := &RestoreResult{CompletedAt: workflow.Now(ctx)}
    workflow.ExecuteActivity(ctx, FinalizeRestoreJobActivity, input.JobID, result).Get(ctx, nil)

    return result, nil
}

Cancellation

Workflow 가 cancel 될 수 있음 (user cancel, guild deleted 등).

func MyWorkflow(ctx workflow.Context, input Input) error {
    // Cancel 시 진행 중 activity 는 완료 허용
    err := workflow.ExecuteActivity(ctx, SomeActivity, input).Get(ctx, nil)
    if err != nil {
        if temporal.IsCanceledError(err) {
            // cleanup
            disConnCtx, _ := workflow.NewDisconnectedContext(ctx)
            workflow.ExecuteActivity(disConnCtx, CleanupActivity, input).Get(disConnCtx, nil)
        }
        return err
    }
    return nil
}

Disconnected context 의 주 용도: cancellation 에도 cleanup 보장.

Activity 작성 규칙

일반 Go 함수

Activity 는 순수 Go 함수. 외부 IO 자유롭게 사용.

type Activities struct {
    snapshots  snapshot.Service
    discord    discord.Client
    sync       sync.Service
    logger     *slog.Logger
}

func (a *Activities) CreateOrUpdateRolesActivity(ctx context.Context, input RestoreInput) error {
    snap, err := a.snapshots.GetByID(ctx, input.SnapshotID)
    if err != nil { return err }

    current, err := a.discord.FetchRoles(ctx, input.GuildDiscordID)
    if err != nil { return err }

    for _, target := range snap.Payload.Roles {
        existing := findRole(current, target.DiscordID)
        if existing == nil {
            if _, err := a.discord.CreateRole(ctx, input.GuildDiscordID, target.ToSpec()); err != nil {
                return wrapDiscordErr(err)
            }
        } else if !equalRole(existing, target) {
            if err := a.discord.UpdateRole(ctx, input.GuildDiscordID, target.DiscordID, target.ToSpec()); err != nil {
                return wrapDiscordErr(err)
            }
        }
    }

    return nil
}

Idempotency

Activity 는 재시도 가능 → idempotent 설계 필수:

  • Discord API 는 대부분 idempotent (role 이름으로 create → 있으면 200, 없으면 201)
  • DB 는 ON CONFLICT, GREATEST 등 사용
  • 외부 결제는 orderId 로 중복 거부

Error classification

// Retryable (일시 장애)
return fmt.Errorf("discord 429: %w", err)

// Non-retryable (business 오류)
return temporal.NewApplicationError(
    "snapshot is not valid",
    "InvalidSnapshot",  // type name → retry policy 의 NonRetryableErrorTypes 와 매칭
)

Heartbeat + cancellation

func (a *Activities) BulkProcessActivity(ctx context.Context, items []Item) error {
    for i, item := range items {
        // Cancellation check
        if err := ctx.Err(); err != nil { return err }

        if err := a.processItem(ctx, item); err != nil { return err }

        // Heartbeat every 10 items
        if i%10 == 0 {
            activity.RecordHeartbeat(ctx, i)
        }
    }
    return nil
}

Worker registration

// apps/worker/internal/workflow/register.go
func RegisterRestoreWorker(w worker.Worker, activities *restore.Activities) {
    w.RegisterWorkflow(restore.RestoreWorkflow)
    w.RegisterActivity(activities.ValidateSnapshotActivity)
    w.RegisterActivity(activities.CreatePreRestoreSnapshotActivity)
    w.RegisterActivity(activities.PauseLiveSyncActivity)
    w.RegisterActivity(activities.ResumeLiveSyncActivity)
    w.RegisterActivity(activities.DeleteRemovedOverridesActivity)
    // ... 모든 activity
}

// main.go
restoreWorker := worker.New(c, "umbra-recovery-restore", worker.Options{
    MaxConcurrentActivityExecutionSize: 10,
})
RegisterRestoreWorker(restoreWorker, restoreActivities)
if err := restoreWorker.Start(); err != nil { ... }

Starting workflow

// engine/recovery/subdomain/restore/app/service.go
func (s *Service) StartRestore(ctx context.Context, input StartInput) (*RestoreJob, error) {
    // ... create RestoreJob in DB

    workflowID := fmt.Sprintf("restore-%s", job.ID)
    options := client.StartWorkflowOptions{
        ID:        workflowID,
        TaskQueue: "umbra-recovery-restore",
        // Idempotent: 같은 ID 로 두 번째 시작은 거부
        WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
    }

    we, err := s.temporal.ExecuteWorkflow(ctx, options, RestoreWorkflow, RestoreInput{
        JobID:      job.ID,
        SnapshotID: input.SnapshotID,
        GuildID:    input.GuildID,
        Scope:      input.Scope,
        Options:    input.Options,
    })
    if err != nil { return nil, err }

    job.TemporalWorkflowID = &we.GetID()
    s.jobRepo.Update(ctx, job)
    return job, nil
}

Querying & signaling

// Query (external status read)
var windowSizes map[string]int
resp, err := temporalClient.QueryWorkflow(ctx, workflowID, "", "getWindowSizes")
if err == nil { _ = resp.Get(&windowSizes) }

// Signal (external event delivery)
err := temporalClient.SignalWorkflow(ctx, workflowID, "", "event", eventSignal)

Versioning

Workflow 변경 시 이미 running 중인 instances 와 호환 유지:

func MyWorkflow(ctx workflow.Context, input Input) error {
    v := workflow.GetVersion(ctx, "new-step", workflow.DefaultVersion, 1)
    if v == workflow.DefaultVersion {
        // Old behavior
        workflow.ExecuteActivity(ctx, OldActivity, ...).Get(ctx, nil)
    } else {
        // New behavior
        workflow.ExecuteActivity(ctx, NewActivity, ...).Get(ctx, nil)
    }
    // ... common code
}

버전 분기는 점진 제거 (모든 running workflow 완료 후).

Testing

Workflow unit test

Temporal 의 testing SDK 사용:

func TestRestoreWorkflow_Success(t *testing.T) {
    ts := &testsuite.WorkflowTestSuite{}
    env := ts.NewTestWorkflowEnvironment()

    env.OnActivity(ValidateSnapshotActivity, mock.Anything, mock.Anything).
        Return(SnapshotData{...}, nil)
    env.OnActivity(PauseLiveSyncActivity, mock.Anything, mock.Anything).
        Return(nil)
    env.OnActivity(CreateOrUpdateRolesActivity, mock.Anything, mock.Anything).
        Return(nil)
    // ... 모든 activity mock
    env.OnActivity(ResumeLiveSyncActivity, mock.Anything, mock.Anything, mock.Anything).
        Return(nil)

    env.ExecuteWorkflow(RestoreWorkflow, RestoreInput{...})

    assert.True(t, env.IsWorkflowCompleted())
    assert.NoError(t, env.GetWorkflowError())
}

Activity unit test

일반 Go 함수라 평범하게 테스트.

Observability

Temporal Web UI

  • http://localhost:8233 (local)
  • Workflow 진행 상황, event history, activity stacktrace 실시간 조회
  • Debugging 에 매우 유용

Search attribute

Workflow 시작 시 메타데이터 추가:

options := client.StartWorkflowOptions{
    ID: workflowID,
    SearchAttributes: map[string]any{
        "guildId":  input.GuildID.String(),
        "scope":    strings.Join(input.Scope, ","),
    },
}

Web UI 에서 guildId 로 필터링 가능.

Metrics

OpenTelemetry 자동 수집:

  • Workflow execution duration
  • Activity retry count
  • Task queue backlog

Do / Don't

Do

  • ✅ Workflow 는 deterministic
  • ✅ IO 는 Activity 로
  • workflow.Now, workflow.NewTimer 사용
  • ✅ Activity 는 idempotent
  • ✅ Retry policy 명시
  • ✅ Heartbeat for long activities
  • ✅ Disconnected context for cleanup
  • WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE 로 중복 방지

Don't

  • ❌ Workflow 에서 time.Now(), rand, HTTP, DB
  • go func() (workflow 내 goroutine)
  • ❌ Workflow 변경 후 versioning 없이 즉시 배포
  • ❌ Activity 가 non-idempotent
  • ❌ 긴 Activity heartbeat 없이

See also

  • asynq-jobs.md — 단기 작업용
  • outbox-pattern.md — 이벤트 전달
  • backend-conventions.md
  • ../domain/recovery/restore.md — workflow 사용례
  • ../domain/recovery/antinuke.md — signal 기반 workflow
  • ../adr/0014-recovery-temporal-workflow.md
  • ../adr/0015-asynq-vs-temporal-split.md