diff options
Diffstat (limited to 'plugins/temporal/workflow/process.go')
-rw-r--r-- | plugins/temporal/workflow/process.go | 33 |
1 files changed, 7 insertions, 26 deletions
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go index ec5a14eb..45e6885c 100644 --- a/plugins/temporal/workflow/process.go +++ b/plugins/temporal/workflow/process.go @@ -58,17 +58,13 @@ func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *com lastCompletionOffset = len(lastCompletion.Payloads) } - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.StartWorkflow{ Info: env.WorkflowInfo(), LastCompletion: lastCompletionOffset, }, input, ) - - if err != nil { - panic(err) - } } // OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server. @@ -130,15 +126,11 @@ func (wf *workflowProcess) Close() { // TODO: properly handle errors // panic(err) - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, nil, ) - if err != nil { - panic(err) - } - _, _ = wf.discardQueue() } @@ -153,29 +145,21 @@ func (wf *workflowProcess) getContext() rrt.Context { // schedule cancel command func (wf *workflowProcess) handleCancel() { - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, nil, ) - - if err != nil { - panic(err) - } } // schedule the signal processing func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) { - _, err := wf.mq.pushCommand( + _ = wf.mq.pushCommand( rrt.InvokeSignal{ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, Name: name, }, input, ) - - if err != nil { - panic(err) - } } // Handle query in blocking mode. @@ -436,11 +420,8 @@ func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) { // Run single command and return single result. func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) { - const op = errors.Op("run command") - _, msg, err := wf.mq.allocateMessage(cmd, payloads) - if err != nil { - return rrt.Message{}, err - } + const op = errors.Op("workflow_process_runcommand") + _, msg := wf.mq.allocateMessage(cmd, payloads) result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg) if err != nil { @@ -448,7 +429,7 @@ func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloa } if len(result) != 1 { - return rrt.Message{}, errors.E("unexpected worker response") + return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response")) } return result[0], nil |