diff options
Diffstat (limited to 'plugins/temporal/workflow/workflow_pool.go')
-rw-r--r-- | plugins/temporal/workflow/workflow_pool.go | 61 |
1 files changed, 28 insertions, 33 deletions
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go index 2022b624..1a78f377 100644 --- a/plugins/temporal/workflow/workflow_pool.go +++ b/plugins/temporal/workflow/workflow_pool.go @@ -19,41 +19,35 @@ import ( const eventWorkerExit = 8390 -type ( - workflowPool interface { - SeqID() uint64 - Exec(p payload.Payload) (payload.Payload, error) - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.BaseProcess - WorkflowNames() []string - } +type workflowPool interface { + SeqID() uint64 + Exec(p payload.Payload) (payload.Payload, error) + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.BaseProcess + WorkflowNames() []string +} - // PoolEvent triggered on workflow pool worker events. - PoolEvent struct { - Event int - Context interface{} - Caused error - } +// PoolEvent triggered on workflow pool worker events. +type PoolEvent struct { + Event int + Context interface{} + Caused error +} - // workflowPoolImpl manages workflowProcess executions between worker restarts. - workflowPoolImpl struct { - codec rrt.Codec - seqID uint64 - workflows map[string]rrt.WorkflowInfo - tWorkers []worker.Worker - mu sync.Mutex - worker rrWorker.SyncWorker - active bool - } -) +// workflowPoolImpl manages workflowProcess executions between worker restarts. +type workflowPoolImpl struct { + codec rrt.Codec + seqID uint64 + workflows map[string]rrt.WorkflowInfo + tWorkers []worker.Worker + mu sync.Mutex + worker rrWorker.SyncWorker + active bool +} // newWorkflowPool creates new workflow pool. -func newWorkflowPool( - codec rrt.Codec, - listener events.Listener, - factory server.Server, -) (workflowPool, error) { +func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) { w, err := factory.NewWorker( context.Background(), map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, @@ -157,9 +151,10 @@ func (pool *workflowPoolImpl) WorkflowNames() []string { // initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("workflow_pool_init_workers") workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter()) if err != nil { - return err + return errors.E(op, err) } pool.workflows = make(map[string]rrt.WorkflowInfo) @@ -168,7 +163,7 @@ func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.T for _, info := range workerInfo { w, err := temporal.CreateWorker(info.TaskQueue, info.Options) if err != nil { - return errors.E(errors.Op("createTemporalWorker"), err, pool.Destroy(ctx)) + return errors.E(op, err, pool.Destroy(ctx)) } pool.tWorkers = append(pool.tWorkers, w) |