diff options
Diffstat (limited to 'plugins/temporal/workflow/process.go')
-rw-r--r-- | plugins/temporal/workflow/process.go | 436 |
1 files changed, 436 insertions, 0 deletions
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go new file mode 100644 index 00000000..45e6885c --- /dev/null +++ b/plugins/temporal/workflow/process.go @@ -0,0 +1,436 @@ +package workflow + +import ( + "strconv" + "sync/atomic" + "time" + + "github.com/spiral/errors" + rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol" + commonpb "go.temporal.io/api/common/v1" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/workflow" +) + +// wraps single workflow process +type workflowProcess struct { + codec rrt.Codec + pool workflowPool + env bindings.WorkflowEnvironment + header *commonpb.Header + mq *messageQueue + ids *idRegistry + seqID uint64 + runID string + pipeline []rrt.Message + callbacks []func() error + canceller *canceller + inLoop bool +} + +// Execute workflow, bootstraps process. +func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) { + wf.env = env + wf.header = header + wf.seqID = 0 + wf.runID = env.WorkflowInfo().WorkflowExecution.RunID + wf.canceller = &canceller{} + + // sequenceID shared for all worker workflows + wf.mq = newMessageQueue(wf.pool.SeqID) + wf.ids = newIDRegistry() + + env.RegisterCancelHandler(wf.handleCancel) + env.RegisterSignalHandler(wf.handleSignal) + env.RegisterQueryHandler(wf.handleQuery) + + var ( + lastCompletion = bindings.GetLastCompletionResult(env) + lastCompletionOffset = 0 + ) + + if lastCompletion != nil && len(lastCompletion.Payloads) != 0 { + if input == nil { + input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}} + } + + input.Payloads = append(input.Payloads, lastCompletion.Payloads...) + lastCompletionOffset = len(lastCompletion.Payloads) + } + + _ = wf.mq.pushCommand( + rrt.StartWorkflow{ + Info: env.WorkflowInfo(), + LastCompletion: lastCompletionOffset, + }, + input, + ) +} + +// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server. +func (wf *workflowProcess) OnWorkflowTaskStarted() { + wf.inLoop = true + defer func() { wf.inLoop = false }() + + var err error + for _, callback := range wf.callbacks { + err = callback() + if err != nil { + panic(err) + } + } + wf.callbacks = nil + + if err := wf.flushQueue(); err != nil { + panic(err) + } + + for len(wf.pipeline) > 0 { + msg := wf.pipeline[0] + wf.pipeline = wf.pipeline[1:] + + if msg.IsCommand() { + err = wf.handleMessage(msg) + } + + if err != nil { + panic(err) + } + } +} + +// StackTrace renders workflow stack trace. +func (wf *workflowProcess) StackTrace() string { + result, err := wf.runCommand( + rrt.GetStackTrace{ + RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, + }, + nil, + ) + + if err != nil { + return err.Error() + } + + var stacktrace string + err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace) + if err != nil { + return err.Error() + } + + return stacktrace +} + +// Close the workflow. +func (wf *workflowProcess) Close() { + // TODO: properly handle errors + // panic(err) + + _ = wf.mq.pushCommand( + rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, + nil, + ) + + _, _ = wf.discardQueue() +} + +// execution context. +func (wf *workflowProcess) getContext() rrt.Context { + return rrt.Context{ + TaskQueue: wf.env.WorkflowInfo().TaskQueueName, + TickTime: wf.env.Now().Format(time.RFC3339), + Replay: wf.env.IsReplaying(), + } +} + +// schedule cancel command +func (wf *workflowProcess) handleCancel() { + _ = wf.mq.pushCommand( + rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID}, + nil, + ) +} + +// schedule the signal processing +func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) { + _ = wf.mq.pushCommand( + rrt.InvokeSignal{ + RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID, + Name: name, + }, + input, + ) +} + +// Handle query in blocking mode. +func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) { + result, err := wf.runCommand( + rrt.InvokeQuery{ + RunID: wf.runID, + Name: queryType, + }, + queryArgs, + ) + + if err != nil { + return nil, err + } + + if result.Failure != nil { + return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter()) + } + + return result.Payloads, nil +} + +// process incoming command +func (wf *workflowProcess) handleMessage(msg rrt.Message) error { + const op = errors.Op("handleMessage") + var err error + + var ( + id = msg.ID + cmd = msg.Command + payloads = msg.Payloads + ) + + switch cmd := cmd.(type) { + case *rrt.ExecuteActivity: + params := cmd.ActivityParams(wf.env, payloads) + activityID := wf.env.ExecuteActivity(params, wf.createCallback(id)) + + wf.canceller.register(id, func() error { + wf.env.RequestCancelActivity(activityID) + return nil + }) + + case *rrt.ExecuteChildWorkflow: + params := cmd.WorkflowParams(wf.env, payloads) + + // always use deterministic id + if params.WorkflowID == "" { + nextID := atomic.AddUint64(&wf.seqID, 1) + params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID)) + } + + wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) { + wf.ids.push(id, r, e) + }) + + wf.canceller.register(id, func() error { + wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID) + return nil + }) + + case *rrt.GetChildWorkflowExecution: + wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) { + cl := wf.createCallback(id) + + // TODO rewrite + if err != nil { + panic(err) + } + + p, err := wf.env.GetDataConverter().ToPayloads(w) + if err != nil { + panic(err) + } + + cl(p, err) + }) + + case *rrt.NewTimer: + timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id)) + wf.canceller.register(id, func() error { + if timerID != nil { + wf.env.RequestCancelTimer(*timerID) + } + return nil + }) + + case *rrt.GetVersion: + version := wf.env.GetVersion( + cmd.ChangeID, + workflow.Version(cmd.MinSupported), + workflow.Version(cmd.MaxSupported), + ) + + result, err := wf.env.GetDataConverter().ToPayloads(version) + if err != nil { + return errors.E(op, err) + } + + wf.mq.pushResponse(id, result) + err = wf.flushQueue() + if err != nil { + panic(err) + } + + case *rrt.SideEffect: + wf.env.SideEffect( + func() (*commonpb.Payloads, error) { + return payloads, nil + }, + wf.createContinuableCallback(id), + ) + + case *rrt.CompleteWorkflow: + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + if msg.Failure == nil { + wf.env.Complete(payloads, nil) + } else { + wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter())) + } + + case *rrt.ContinueAsNew: + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + wf.env.Complete(nil, &workflow.ContinueAsNewError{ + WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, + Input: payloads, + Header: wf.header, + TaskQueueName: cmd.Options.TaskQueueName, + WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout, + WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout, + WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout, + }) + + case *rrt.SignalExternalWorkflow: + wf.env.SignalExternalWorkflow( + cmd.Namespace, + cmd.WorkflowID, + cmd.RunID, + cmd.Signal, + payloads, + nil, + cmd.ChildWorkflowOnly, + wf.createCallback(id), + ) + + case *rrt.CancelExternalWorkflow: + wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id)) + + case *rrt.Cancel: + err = wf.canceller.cancel(cmd.CommandIDs...) + if err != nil { + return errors.E(op, err) + } + + result, _ := wf.env.GetDataConverter().ToPayloads("completed") + wf.mq.pushResponse(id, result) + + err = wf.flushQueue() + if err != nil { + panic(err) + } + + case *rrt.Panic: + panic(errors.E(cmd.Message)) + + default: + panic("undefined command") + } + + return nil +} + +func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler { + callback := func(result *commonpb.Payloads, err error) error { + wf.canceller.discard(id) + + if err != nil { + wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) + return nil + } + + // fetch original payload + wf.mq.pushResponse(id, result) + return nil + } + + return func(result *commonpb.Payloads, err error) { + // timer cancel callback can happen inside the loop + if wf.inLoop { + err := callback(result, err) + if err != nil { + panic(err) + } + + return + } + + wf.callbacks = append(wf.callbacks, func() error { + return callback(result, err) + }) + } +} + +// callback to be called inside the queue processing, adds new messages at the end of the queue +func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler { + callback := func(result *commonpb.Payloads, err error) { + wf.canceller.discard(id) + + if err != nil { + wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter())) + return + } + + wf.mq.pushResponse(id, result) + err = wf.flushQueue() + if err != nil { + panic(err) + } + } + + return func(result *commonpb.Payloads, err error) { + callback(result, err) + } +} + +// Exchange messages between host and worker processes and add new commands to the queue. +func (wf *workflowProcess) flushQueue() error { + const op = errors.Op("flush queue") + messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) + wf.mq.flush() + + if err != nil { + return errors.E(op, err) + } + + wf.pipeline = append(wf.pipeline, messages...) + + return nil +} + +// Exchange messages between host and worker processes without adding new commands to the queue. +func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) { + const op = errors.Op("discard queue") + messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...) + wf.mq.flush() + + if err != nil { + return nil, errors.E(op, err) + } + + return messages, nil +} + +// Run single command and return single result. +func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) { + const op = errors.Op("workflow_process_runcommand") + _, msg := wf.mq.allocateMessage(cmd, payloads) + + result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg) + if err != nil { + return rrt.Message{}, errors.E(op, err) + } + + if len(result) != 1 { + return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response")) + } + + return result[0], nil +} |