Temporal Workflow Guide¶
Umbra 가 Temporal 을 사용하는 방식과 실무 패턴. Recovery domain 의 restore / antinuke workflow 작성 가이드를 중심으로 한다. asynq 와 대비되는 사용 기준은
asynq-jobs.md참조.
Why Temporal¶
Recovery 는 장기 실행 + 재시작 안전성 + 단계별 재시도가 필요하다. 자체 구현하면 상태 머신과 retry 로직을 수동으로 만들어야 한다. Temporal 은 이를 프레임워크로 제공 (ADR-0014).
핵심 보장:
- Durable execution — workflow 프로세스가 죽어도 상태 유지, 재시작 후 이어서 실행
- Automatic retry — Activity 는 configurable backoff 로 자동 재시도
- Visibility — Temporal Web UI 로 실시간 진행 조회
- Idempotency guidance — Workflow/Activity 설계가 자연스럽게 idempotent 해짐
When to use¶
Temporal 적합¶
- 복구 (Recovery) — 수십 초 ~ 수 분, 단계별 재시도 필요
- AntiNuke — 길드당 장기 running 감지기
- Phase 2+: 장기 사용자 journey (예: 7일 trial → 결제 유도)
Temporal 부적합 (asynq 사용)¶
- 짧은 백그라운드 작업 (수 초)
- cron 기반 정기 실행 (일일 스냅샷, 결제 cron)
- Outbox poller, Notification 발송
- Live Sync 배치
상세 비교는 asynq-jobs.md 와 ADR-0015.
Architecture¶
컴포넌트¶
- Temporal Server — Neon Postgres 를 backing store 로 self-host (Phase 2 에서 Temporal Cloud 검토)
- Worker —
apps/worker가 workflow 와 activity 를 실행 - Client —
apps/api와 도메인 서비스가 workflow 시작/신호 전송
Namespace¶
umbra-local, umbra-prod 등 환경별 namespace 분리.
Task queues¶
umbra-recovery-restore— Restore workflowumbra-recovery-antinuke— AntiNuke workflow
향후 도메인 확장 시 추가.
Workflow 작성 규칙¶
Determinism (가장 중요)¶
Workflow 는 결정적 (deterministic) 이어야 한다. Temporal 이 event history 를 replay 하여 상태를 복원하므로, 같은 history 로 실행 시 항상 같은 결과가 나와야 함.
Workflow 에서 금지:
// ❌ Non-deterministic
time.Now() // 실제 시각은 replay 시 다름
rand.Intn(10) // 랜덤 값
http.Get(...) // 외부 IO
os.ReadFile(...) // 파일 IO
uuid.NewV7() // UUID 생성 (시간 기반)
go func() { ... }() // 병렬 goroutine
time.Sleep(d) // 일반 sleep
channel send/receive 에 select timeout 외 사용
Workflow 에서 허용:
workflow.Now(ctx) // deterministic time
workflow.NewTimer(ctx, duration) // deterministic sleep
workflow.SideEffect(ctx, func() any {...}) // non-det 값 한 번만 계산 후 replay
workflow.ExecuteActivity(ctx, fn, args...) // 외부 IO 는 Activity 로
workflow.NewSelector(ctx) // event-driven 흐름
workflow.ExecuteChildWorkflow(ctx, ...) // 하위 workflow
Activity 로 분리¶
외부 IO, 시간 의존, random 은 모두 Activity 로:
// Workflow
func RestoreWorkflow(ctx workflow.Context, input RestoreInput) error {
var snap SnapshotData
err := workflow.ExecuteActivity(ctx, ValidateSnapshotActivity, input.SnapshotID).Get(ctx, &snap)
if err != nil { return err }
...
}
// Activity (일반 Go 함수, IO 허용)
func ValidateSnapshotActivity(ctx context.Context, snapshotID uuid.UUID) (SnapshotData, error) {
snap, err := repo.GetByID(ctx, snapshotID)
if err != nil { return SnapshotData{}, err }
if !snap.IsValid() { return SnapshotData{}, ErrInvalidSnapshot }
return snap.ToActivityResult(), nil
}
Retry policy¶
ao := workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
HeartbeatTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 10,
NonRetryableErrorTypes: []string{
"PermissionDenied",
"GuildNotFound",
"InvalidSnapshot",
},
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
NonRetryableErrorTypes— business logic 오류는 재시도 무의미StartToCloseTimeout— 단일 실행 타임아웃HeartbeatTimeout— 장기 실행 Activity 가 살아있음을 증명
Heartbeat¶
장기 실행 Activity 는 주기적으로 heartbeat:
func LongActivity(ctx context.Context, input Input) error {
for _, item := range input.Items {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
processItem(item)
activity.RecordHeartbeat(ctx, item.ID) // 진행 상태 전달
}
return nil
}
Heartbeat 없이 오래 걸리면 Temporal 이 timeout 판정 → retry.
패턴: Signal 기반 AntiNuke workflow¶
AntiNuke 는 길드당 장기 running workflow 가 signal 수신하며 rolling window 관리:
func AntiNukeWorkflow(ctx workflow.Context, guildID uuid.UUID) error {
logger := workflow.GetLogger(ctx)
windows := newRollingWindows(defaultThresholds())
antinukeEnabled := true
// Query handler (외부에서 현재 상태 조회)
workflow.SetQueryHandler(ctx, "getWindowSizes", func() (map[string]int, error) {
result := map[string]int{}
for p, w := range windows {
result[string(p)] = w.Len()
}
return result, nil
})
for {
selector := workflow.NewSelector(ctx)
// Event signal
selector.AddReceive(
workflow.GetSignalChannel(ctx, "event"),
func(c workflow.ReceiveChannel, more bool) {
var ev EventSignal
c.Receive(ctx, &ev)
if !antinukeEnabled { return }
window := windows[ev.Pattern]
window.Push(ev)
window.EvictOld(workflow.Now(ctx))
if window.Len() >= window.Threshold() {
respondToDetection(ctx, guildID, ev.Pattern, window)
window.Clear()
}
},
)
// Config update signal
selector.AddReceive(
workflow.GetSignalChannel(ctx, "config_update"),
func(c workflow.ReceiveChannel, more bool) {
var cfg ConfigUpdate
c.Receive(ctx, &cfg)
antinukeEnabled = cfg.Enabled
for pattern, threshold := range cfg.Thresholds {
windows[pattern].UpdateThreshold(threshold)
}
},
)
// Shutdown signal
selector.AddReceive(
workflow.GetSignalChannel(ctx, "shutdown"),
func(c workflow.ReceiveChannel, more bool) {
var _ any
c.Receive(ctx, &_)
logger.Info("antinuke workflow shutting down", "guild_id", guildID)
},
)
// Periodic evict timer
selector.AddFuture(
workflow.NewTimer(ctx, time.Minute),
func(f workflow.Future) {
now := workflow.Now(ctx)
for _, w := range windows {
w.EvictOld(now)
}
},
)
selector.Select(ctx)
}
}
외부에서 signal 전송:
// Go client
err := temporalClient.SignalWorkflow(ctx, workflowID, "", "event", EventSignal{
Pattern: MassRoleDelete,
Timestamp: time.Now(),
AggregateID: roleID,
})
패턴: Sequential workflow¶
Restore 처럼 단계별로 실행되는 workflow:
func RestoreWorkflow(ctx workflow.Context, input RestoreInput) (*RestoreResult, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: restoreRetryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 1. Validate
var snap SnapshotData
if err := workflow.ExecuteActivity(ctx, ValidateSnapshotActivity, input.SnapshotID).Get(ctx, &snap); err != nil {
return nil, err
}
// 2. Optional pre-restore snapshot
if input.Options.CreatePreRestoreSnapshot {
var preID uuid.UUID
if err := workflow.ExecuteActivity(ctx, CreatePreRestoreSnapshotActivity, input.GuildID).Get(ctx, &preID); err != nil {
workflow.GetLogger(ctx).Warn("pre-restore snapshot failed", "error", err)
}
}
// 3. Pause Live Sync with defer resume
if err := workflow.ExecuteActivity(ctx, PauseLiveSyncActivity, input.GuildID).Get(ctx, nil); err != nil {
return nil, err
}
defer func() {
// Disconnected context 로 cancellation 에도 실행
disConnCtx, _ := workflow.NewDisconnectedContext(ctx)
_ = workflow.ExecuteActivity(disConnCtx, ResumeLiveSyncActivity, input.GuildID, true).Get(disConnCtx, nil)
}()
// 4. Delete phase (역순)
if contains(input.Scope, "permission_overrides") {
workflow.ExecuteActivity(ctx, DeleteRemovedOverridesActivity, input).Get(ctx, nil)
}
if contains(input.Scope, "channels") {
workflow.ExecuteActivity(ctx, DeleteRemovedChannelsActivity, input).Get(ctx, nil)
}
if contains(input.Scope, "roles") {
workflow.ExecuteActivity(ctx, DeleteRemovedRolesActivity, input).Get(ctx, nil)
}
// 5. Create/Update phase (정순)
if contains(input.Scope, "roles") {
workflow.ExecuteActivity(ctx, CreateOrUpdateRolesActivity, input).Get(ctx, nil)
}
if contains(input.Scope, "channels") {
workflow.ExecuteActivity(ctx, CreateOrUpdateChannelsActivity, input).Get(ctx, nil)
}
if contains(input.Scope, "permission_overrides") {
workflow.ExecuteActivity(ctx, ApplyPermissionOverridesActivity, input).Get(ctx, nil)
}
if contains(input.Scope, "guild_settings") {
workflow.ExecuteActivity(ctx, UpdateGuildSettingsActivity, input).Get(ctx, nil)
}
// 6. Finalize
result := &RestoreResult{CompletedAt: workflow.Now(ctx)}
workflow.ExecuteActivity(ctx, FinalizeRestoreJobActivity, input.JobID, result).Get(ctx, nil)
return result, nil
}
Cancellation¶
Workflow 가 cancel 될 수 있음 (user cancel, guild deleted 등).
func MyWorkflow(ctx workflow.Context, input Input) error {
// Cancel 시 진행 중 activity 는 완료 허용
err := workflow.ExecuteActivity(ctx, SomeActivity, input).Get(ctx, nil)
if err != nil {
if temporal.IsCanceledError(err) {
// cleanup
disConnCtx, _ := workflow.NewDisconnectedContext(ctx)
workflow.ExecuteActivity(disConnCtx, CleanupActivity, input).Get(disConnCtx, nil)
}
return err
}
return nil
}
Disconnected context 의 주 용도: cancellation 에도 cleanup 보장.
Activity 작성 규칙¶
일반 Go 함수¶
Activity 는 순수 Go 함수. 외부 IO 자유롭게 사용.
type Activities struct {
snapshots snapshot.Service
discord discord.Client
sync sync.Service
logger *slog.Logger
}
func (a *Activities) CreateOrUpdateRolesActivity(ctx context.Context, input RestoreInput) error {
snap, err := a.snapshots.GetByID(ctx, input.SnapshotID)
if err != nil { return err }
current, err := a.discord.FetchRoles(ctx, input.GuildDiscordID)
if err != nil { return err }
for _, target := range snap.Payload.Roles {
existing := findRole(current, target.DiscordID)
if existing == nil {
if _, err := a.discord.CreateRole(ctx, input.GuildDiscordID, target.ToSpec()); err != nil {
return wrapDiscordErr(err)
}
} else if !equalRole(existing, target) {
if err := a.discord.UpdateRole(ctx, input.GuildDiscordID, target.DiscordID, target.ToSpec()); err != nil {
return wrapDiscordErr(err)
}
}
}
return nil
}
Idempotency¶
Activity 는 재시도 가능 → idempotent 설계 필수:
- Discord API 는 대부분 idempotent (role 이름으로 create → 있으면 200, 없으면 201)
- DB 는
ON CONFLICT,GREATEST등 사용 - 외부 결제는 orderId 로 중복 거부
Error classification¶
// Retryable (일시 장애)
return fmt.Errorf("discord 429: %w", err)
// Non-retryable (business 오류)
return temporal.NewApplicationError(
"snapshot is not valid",
"InvalidSnapshot", // type name → retry policy 의 NonRetryableErrorTypes 와 매칭
)
Heartbeat + cancellation¶
func (a *Activities) BulkProcessActivity(ctx context.Context, items []Item) error {
for i, item := range items {
// Cancellation check
if err := ctx.Err(); err != nil { return err }
if err := a.processItem(ctx, item); err != nil { return err }
// Heartbeat every 10 items
if i%10 == 0 {
activity.RecordHeartbeat(ctx, i)
}
}
return nil
}
Worker registration¶
// apps/worker/internal/workflow/register.go
func RegisterRestoreWorker(w worker.Worker, activities *restore.Activities) {
w.RegisterWorkflow(restore.RestoreWorkflow)
w.RegisterActivity(activities.ValidateSnapshotActivity)
w.RegisterActivity(activities.CreatePreRestoreSnapshotActivity)
w.RegisterActivity(activities.PauseLiveSyncActivity)
w.RegisterActivity(activities.ResumeLiveSyncActivity)
w.RegisterActivity(activities.DeleteRemovedOverridesActivity)
// ... 모든 activity
}
// main.go
restoreWorker := worker.New(c, "umbra-recovery-restore", worker.Options{
MaxConcurrentActivityExecutionSize: 10,
})
RegisterRestoreWorker(restoreWorker, restoreActivities)
if err := restoreWorker.Start(); err != nil { ... }
Starting workflow¶
// engine/recovery/subdomain/restore/app/service.go
func (s *Service) StartRestore(ctx context.Context, input StartInput) (*RestoreJob, error) {
// ... create RestoreJob in DB
workflowID := fmt.Sprintf("restore-%s", job.ID)
options := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: "umbra-recovery-restore",
// Idempotent: 같은 ID 로 두 번째 시작은 거부
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
}
we, err := s.temporal.ExecuteWorkflow(ctx, options, RestoreWorkflow, RestoreInput{
JobID: job.ID,
SnapshotID: input.SnapshotID,
GuildID: input.GuildID,
Scope: input.Scope,
Options: input.Options,
})
if err != nil { return nil, err }
job.TemporalWorkflowID = &we.GetID()
s.jobRepo.Update(ctx, job)
return job, nil
}
Querying & signaling¶
// Query (external status read)
var windowSizes map[string]int
resp, err := temporalClient.QueryWorkflow(ctx, workflowID, "", "getWindowSizes")
if err == nil { _ = resp.Get(&windowSizes) }
// Signal (external event delivery)
err := temporalClient.SignalWorkflow(ctx, workflowID, "", "event", eventSignal)
Versioning¶
Workflow 변경 시 이미 running 중인 instances 와 호환 유지:
func MyWorkflow(ctx workflow.Context, input Input) error {
v := workflow.GetVersion(ctx, "new-step", workflow.DefaultVersion, 1)
if v == workflow.DefaultVersion {
// Old behavior
workflow.ExecuteActivity(ctx, OldActivity, ...).Get(ctx, nil)
} else {
// New behavior
workflow.ExecuteActivity(ctx, NewActivity, ...).Get(ctx, nil)
}
// ... common code
}
버전 분기는 점진 제거 (모든 running workflow 완료 후).
Testing¶
Workflow unit test¶
Temporal 의 testing SDK 사용:
func TestRestoreWorkflow_Success(t *testing.T) {
ts := &testsuite.WorkflowTestSuite{}
env := ts.NewTestWorkflowEnvironment()
env.OnActivity(ValidateSnapshotActivity, mock.Anything, mock.Anything).
Return(SnapshotData{...}, nil)
env.OnActivity(PauseLiveSyncActivity, mock.Anything, mock.Anything).
Return(nil)
env.OnActivity(CreateOrUpdateRolesActivity, mock.Anything, mock.Anything).
Return(nil)
// ... 모든 activity mock
env.OnActivity(ResumeLiveSyncActivity, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
env.ExecuteWorkflow(RestoreWorkflow, RestoreInput{...})
assert.True(t, env.IsWorkflowCompleted())
assert.NoError(t, env.GetWorkflowError())
}
Activity unit test¶
일반 Go 함수라 평범하게 테스트.
Observability¶
Temporal Web UI¶
- http://localhost:8233 (local)
- Workflow 진행 상황, event history, activity stacktrace 실시간 조회
- Debugging 에 매우 유용
Search attribute¶
Workflow 시작 시 메타데이터 추가:
options := client.StartWorkflowOptions{
ID: workflowID,
SearchAttributes: map[string]any{
"guildId": input.GuildID.String(),
"scope": strings.Join(input.Scope, ","),
},
}
Web UI 에서 guildId 로 필터링 가능.
Metrics¶
OpenTelemetry 자동 수집:
- Workflow execution duration
- Activity retry count
- Task queue backlog
Do / Don't¶
Do¶
- ✅ Workflow 는 deterministic
- ✅ IO 는 Activity 로
- ✅
workflow.Now,workflow.NewTimer사용 - ✅ Activity 는 idempotent
- ✅ Retry policy 명시
- ✅ Heartbeat for long activities
- ✅ Disconnected context for cleanup
- ✅
WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE로 중복 방지
Don't¶
- ❌ Workflow 에서
time.Now(),rand, HTTP, DB - ❌
go func()(workflow 내 goroutine) - ❌ Workflow 변경 후 versioning 없이 즉시 배포
- ❌ Activity 가 non-idempotent
- ❌ 긴 Activity heartbeat 없이
See also¶
asynq-jobs.md— 단기 작업용outbox-pattern.md— 이벤트 전달backend-conventions.md../domain/recovery/restore.md— workflow 사용례../domain/recovery/antinuke.md— signal 기반 workflow../adr/0014-recovery-temporal-workflow.md../adr/0015-asynq-vs-temporal-split.md