diff options
author | Valery Piashchynski <[email protected]> | 2021-01-26 01:06:16 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-26 01:06:16 +0300 |
commit | 4638bdca80f75bc120b330022086d31c8b41be5b (patch) | |
tree | 2362cdb39dc2e793f5bec7fd9b8d2363f516c1d4 /plugins/temporal/workflow | |
parent | 7756eb25453c8006fbd75aa5c97159e96331b840 (diff) |
Code cleanup
Diffstat (limited to 'plugins/temporal/workflow')
-rw-r--r-- | plugins/temporal/workflow/message_queue.go | 20 | ||||
-rw-r--r-- | plugins/temporal/workflow/message_queue_test.go | 4 | ||||
-rw-r--r-- | plugins/temporal/workflow/plugin.go | 37 | ||||
-rw-r--r-- | plugins/temporal/workflow/process.go | 33 | ||||
-rw-r--r-- | plugins/temporal/workflow/workflow_pool.go | 61 |
5 files changed, 62 insertions, 93 deletions
diff --git a/plugins/temporal/workflow/message_queue.go b/plugins/temporal/workflow/message_queue.go index 50949897..8f4409d1 100644 --- a/plugins/temporal/workflow/message_queue.go +++ b/plugins/temporal/workflow/message_queue.go @@ -22,28 +22,20 @@ func (mq *messageQueue) flush() { mq.queue = mq.queue[0:0] } -func (mq *messageQueue) allocateMessage( - cmd interface{}, - payloads *common.Payloads, -) (id uint64, msg rrt.Message, err error) { - msg = rrt.Message{ +func (mq *messageQueue) allocateMessage(cmd interface{}, payloads *common.Payloads) (uint64, rrt.Message) { + msg := rrt.Message{ ID: mq.seqID(), Command: cmd, Payloads: payloads, } - return msg.ID, msg, nil + return msg.ID, msg } -func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) (id uint64, err error) { - id, msg, err := mq.allocateMessage(cmd, payloads) - if err != nil { - return 0, err - } - +func (mq *messageQueue) pushCommand(cmd interface{}, payloads *common.Payloads) uint64 { + id, msg := mq.allocateMessage(cmd, payloads) mq.queue = append(mq.queue, msg) - - return id, nil + return id } func (mq *messageQueue) pushResponse(id uint64, payloads *common.Payloads) { diff --git a/plugins/temporal/workflow/message_queue_test.go b/plugins/temporal/workflow/message_queue_test.go index 61f5123f..1fcd409f 100644 --- a/plugins/temporal/workflow/message_queue_test.go +++ b/plugins/temporal/workflow/message_queue_test.go @@ -44,10 +44,8 @@ func Test_MessageQueueCommandID(t *testing.T) { return atomic.AddUint64(&index, 1) }) - n, err := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{}) + n := mq.pushCommand(protocol.StartWorkflow{}, &common.Payloads{}) assert.Equal(t, n, index) - - assert.NoError(t, err) assert.Len(t, mq.queue, 1) mq.flush() diff --git a/plugins/temporal/workflow/plugin.go b/plugins/temporal/workflow/plugin.go index ee2d722c..3a397364 100644 --- a/plugins/temporal/workflow/plugin.go +++ b/plugins/temporal/workflow/plugin.go @@ -41,18 +41,19 @@ func (p *Plugin) Init(temporal client.Temporal, server server.Server, log logger p.server = server p.events = events.NewEventsHandler() p.log = log - p.reset = make(chan struct{}) + p.reset = make(chan struct{}, 1) return nil } // Serve starts workflow service. func (p *Plugin) Serve() chan error { + const op = errors.Op("workflow_plugin_serve") errCh := make(chan error, 1) pool, err := p.startPool() if err != nil { - errCh <- errors.E("startPool", err) + errCh <- errors.E(op, err) return errCh } @@ -76,7 +77,7 @@ func (p *Plugin) Serve() chan error { err = backoff.Retry(p.replacePool, bkoff) if err != nil { - errCh <- errors.E("deadPool", err) + errCh <- errors.E(op, err) } } } @@ -87,12 +88,17 @@ func (p *Plugin) Serve() chan error { // Stop workflow service. func (p *Plugin) Stop() error { + const op = errors.Op("workflow_plugin_stop") atomic.StoreInt64(&p.closing, 1) pool := p.getPool() if pool != nil { p.pool = nil - return pool.Destroy(context.Background()) + err := pool.Destroy(context.Background()) + if err != nil { + return errors.E(op, err) + } + return nil } return nil @@ -121,11 +127,6 @@ func (p *Plugin) Reset() error { } // AddListener adds event listeners to the service. -func (p *Plugin) AddListener(listener events.Listener) { - p.events.AddListener(listener) -} - -// AddListener adds event listeners to the service. func (p *Plugin) poolListener(event interface{}) { if ev, ok := event.(PoolEvent); ok { if ev.Event == eventWorkerExit { @@ -141,18 +142,19 @@ func (p *Plugin) poolListener(event interface{}) { // AddListener adds event listeners to the service. func (p *Plugin) startPool() (workflowPool, error) { + const op = errors.Op("workflow_plugin_start_pool") pool, err := newWorkflowPool( p.temporal.GetCodec().WithLogger(p.log), p.poolListener, p.server, ) if err != nil { - return nil, errors.E(errors.Op("initWorkflowPool"), err) + return nil, errors.E(op, err) } err = pool.Start(context.Background(), p.temporal) if err != nil { - return nil, errors.E(errors.Op("startWorkflowPool"), err) + return nil, errors.E(op, err) } p.log.Debug("Started workflow processing", "workflows", pool.WorkflowNames()) @@ -162,29 +164,30 @@ func (p *Plugin) startPool() (workflowPool, error) { func (p *Plugin) replacePool() error { p.mu.Lock() + const op = errors.Op("workflow_plugin_replace_pool") defer p.mu.Unlock() if p.pool != nil { - errD := p.pool.Destroy(context.Background()) + err := p.pool.Destroy(context.Background()) p.pool = nil - if errD != nil { + if err != nil { p.log.Error( "Unable to destroy expired workflow pool", "error", - errors.E(errors.Op("destroyWorkflowPool"), errD), + errors.E(op, err), ) + return errors.E(op, err) } } pool, err := p.startPool() if err != nil { p.log.Error("Replace workflow pool failed", "error", err) - return errors.E(errors.Op("newWorkflowPool"), err) + return errors.E(op, err) } - p.log.Debug("Replace workflow pool") - p.pool = pool + p.log.Debug("workflow pool successfully replaced") return nil } diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go index ec5a14eb..45e6885c 100644 --- a/plugins/temporal/workflow/process.go +++ b/plugins/temporal/workflow/process.go @@ -58,17 +58,13 @@ func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *com lastCompletionOffset = len(lastCompletion.Payloads) } - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.StartWorkflow{ Info: env.WorkflowInfo(), LastCompletion: lastCompletionOffset, }, input, ) - - if err != nil { - panic(err) - } } // OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server. @@ -130,15 +126,11 @@ func (wf *workflowProcess) Close() { // TODO: properly handle errors // panic(err) - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, nil, ) - if err != nil { - panic(err) - } - _, _ = wf.discardQueue() } @@ -153,29 +145,21 @@ func (wf *workflowProcess) getContext() rrt.Context { // schedule cancel command func (wf *workflowProcess) handleCancel() { - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, nil, ) - - if err != nil { - panic(err) - } } // schedule the signal processing func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) { - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.InvokeSignal{ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, Name: name, }, input, ) - - if err != nil { - panic(err) - } } // Handle query in blocking mode. @@ -436,11 +420,8 @@ func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) { // Run single command and return single result. func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) { - const op = errors.Op("run command") - _, msg, err := wf.mq.allocateMessage(cmd, payloads) - if err != nil { - return rrt.Message{}, err - } + const op = errors.Op("workflow_process_runcommand") + _, msg := wf.mq.allocateMessage(cmd, payloads) result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg) if err != nil { @@ -448,7 +429,7 @@ func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloa } if len(result) != 1 { - return rrt.Message{}, errors.E("unexpected worker response") + return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response")) } return result[0], nil diff --git a/plugins/temporal/workflow/workflow_pool.go b/plugins/temporal/workflow/workflow_pool.go index 2022b624..1a78f377 100644 --- a/plugins/temporal/workflow/workflow_pool.go +++ b/plugins/temporal/workflow/workflow_pool.go @@ -19,41 +19,35 @@ import ( const eventWorkerExit = 8390 -type ( - workflowPool interface { - SeqID() uint64 - Exec(p payload.Payload) (payload.Payload, error) - Start(ctx context.Context, temporal client.Temporal) error - Destroy(ctx context.Context) error - Workers() []rrWorker.BaseProcess - WorkflowNames() []string - } +type workflowPool interface { + SeqID() uint64 + Exec(p payload.Payload) (payload.Payload, error) + Start(ctx context.Context, temporal client.Temporal) error + Destroy(ctx context.Context) error + Workers() []rrWorker.BaseProcess + WorkflowNames() []string +} - // PoolEvent triggered on workflow pool worker events. - PoolEvent struct { - Event int - Context interface{} - Caused error - } +// PoolEvent triggered on workflow pool worker events. +type PoolEvent struct { + Event int + Context interface{} + Caused error +} - // workflowPoolImpl manages workflowProcess executions between worker restarts. - workflowPoolImpl struct { - codec rrt.Codec - seqID uint64 - workflows map[string]rrt.WorkflowInfo - tWorkers []worker.Worker - mu sync.Mutex - worker rrWorker.SyncWorker - active bool - } -) +// workflowPoolImpl manages workflowProcess executions between worker restarts. +type workflowPoolImpl struct { + codec rrt.Codec + seqID uint64 + workflows map[string]rrt.WorkflowInfo + tWorkers []worker.Worker + mu sync.Mutex + worker rrWorker.SyncWorker + active bool +} // newWorkflowPool creates new workflow pool. -func newWorkflowPool( - codec rrt.Codec, - listener events.Listener, - factory server.Server, -) (workflowPool, error) { +func newWorkflowPool(codec rrt.Codec, listener events.Listener, factory server.Server) (workflowPool, error) { w, err := factory.NewWorker( context.Background(), map[string]string{"RR_MODE": RRMode, "RR_CODEC": codec.GetName()}, @@ -157,9 +151,10 @@ func (pool *workflowPoolImpl) WorkflowNames() []string { // initWorkers request workers workflows from underlying PHP and configures temporal workers linked to the pool. func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.Temporal) error { + const op = errors.Op("workflow_pool_init_workers") workerInfo, err := rrt.FetchWorkerInfo(pool.codec, pool, temporal.GetDataConverter()) if err != nil { - return err + return errors.E(op, err) } pool.workflows = make(map[string]rrt.WorkflowInfo) @@ -168,7 +163,7 @@ func (pool *workflowPoolImpl) initWorkers(ctx context.Context, temporal client.T for _, info := range workerInfo { w, err := temporal.CreateWorker(info.TaskQueue, info.Options) if err != nil { - return errors.E(errors.Op("createTemporalWorker"), err, pool.Destroy(ctx)) + return errors.E(op, err, pool.Destroy(ctx)) } pool.tWorkers = append(pool.tWorkers, w) |