summaryrefslogtreecommitdiff
path: root/plugins/temporal/workflow/workflow_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/workflow/workflow_pool.go')
-rw-r--r--plugins/temporal/workflow/workflow_pool.go61
1 files changed, 28 insertions, 33 deletions
diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go
index 2022b624..1a78f377 100644
--- a/plugins/temporal/workflow/workflow_pool.go
+++ b/plugins/temporal/workflow/workflow_pool.go
@@ -19,41 +19,35 @@ import (
const eventWorkerExit = 8390
-type (
- workflowPool interface {
- SeqID() uint64
- Exec(p payload.Payload) (payload.Payload, error)
- Start(ctx context.Context, temporal client.Temporal) error
- Destroy(ctx context.Context) error
- Workers() []rrWorker.BaseProcess
- WorkflowNames() []string
- }
+type workflowPool interface {
+ SeqID() uint64
+ Exec(p payload.Payload) (payload.Payload, error)
+ Start(ctx context.Context, temporal client.Temporal) error
+ Destroy(ctx context.Context) error
+ Workers() []rrWorker.BaseProcess
+ WorkflowNames() []string
+}
- // PoolEvent triggered on workflow pool worker events.
- PoolEvent struct {
- Event int
- Context interface{}
- Caused error
- }
+// PoolEvent triggered on workflow pool worker events.
+type PoolEvent struct {
+ Event int
+ Context interface{}
+ Caused error
+}
- // workflowPoolImpl manages workflowProcess executions between worker restarts.
- workflowPoolImpl struct {
- codec rrt.Codec
- seqID uint64
- workflows map[string]rrt.WorkflowInfo
- tWorkers []worker.Worker
- mu sync.Mutex
- worker rrWorker.SyncWorker
- active bool
- }
-)
+// workflowPoolImpl manages workflowProcess executions between worker restarts.
+type workflowPoolImpl struct {
+ codec rrt.Codec
+ seqID uint64
+ workflows map[string]rrt.WorkflowInfo
+ tWorkers []worker.Worker
+ mu sync.Mutex
+ worker rrWorker.SyncWorker
+ active bool
+}
// newWorkflowPool creates new workflow pool.
-func newWorkflowPool(
- codec rrt.Codec,
- listener events.Listener,
- factory server.Server,
-) (workflowPool, error) {
+func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) {
w, err := factory.NewWorker(
context.Background(),
map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()},
@@ -157,9 +151,10 @@ func (pool *workflowPoolImpl) WorkflowNames() []string {
// initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool.
func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error {
+ const op = errors.Op("workflow_pool_init_workers")
workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter())
if err != nil {
- return err
+ return errors.E(op, err)
}
pool.workflows = make(map[string]rrt.WorkflowInfo)
@@ -168,7 +163,7 @@ func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.T
for _, info := range workerInfo {
w, err := temporal.CreateWorker(info.TaskQueue, info.Options)
if err != nil {
- return errors.E(errors.Op("createTemporalWorker"), err, pool.Destroy(ctx))
+ return errors.E(op, err, pool.Destroy(ctx))
}
pool.tWorkers = append(pool.tWorkers, w)