summaryrefslogtreecommitdiff
path: root/plugins/temporal/workflow/process.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/workflow/process.go')
-rw-r--r--plugins/temporal/workflow/process.go33
1 files changed, 7 insertions, 26 deletions
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
index ec5a14eb..45e6885c 100644
--- a/plugins/temporal/workflow/process.go
+++ b/plugins/temporal/workflow/process.go
@@ -58,17 +58,13 @@ func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *com
lastCompletionOffset = len(lastCompletion.Payloads)
}
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.StartWorkflow{
Info: env.WorkflowInfo(),
LastCompletion: lastCompletionOffset,
},
input,
)
-
- if err != nil {
- panic(err)
- }
}
// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
@@ -130,15 +126,11 @@ func (wf *workflowProcess) Close() {
// TODO: properly handle errors
// panic(err)
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
nil,
)
- if err != nil {
- panic(err)
- }
-
_, _ = wf.discardQueue()
}
@@ -153,29 +145,21 @@ func (wf *workflowProcess) getContext() rrt.Context {
// schedule cancel command
func (wf *workflowProcess) handleCancel() {
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
nil,
)
-
- if err != nil {
- panic(err)
- }
}
// schedule the signal processing
func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
- _, err := wf.mq.pushCommand(
+ _ = wf.mq.pushCommand(
rrt.InvokeSignal{
RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
Name: name,
},
input,
)
-
- if err != nil {
- panic(err)
- }
}
// Handle query in blocking mode.
@@ -436,11 +420,8 @@ func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
// Run single command and return single result.
func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
- const op = errors.Op("run command")
- _, msg, err := wf.mq.allocateMessage(cmd, payloads)
- if err != nil {
- return rrt.Message{}, err
- }
+ const op = errors.Op("workflow_process_runcommand")
+ _, msg := wf.mq.allocateMessage(cmd, payloads)
result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
if err != nil {
@@ -448,7 +429,7 @@ func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloa
}
if len(result) != 1 {
- return rrt.Message{}, errors.E("unexpected worker response")
+ return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
}
return result[0], nil