summaryrefslogtreecommitdiff
path: root/plugins/temporal/workflow/process.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/workflow/process.go')
-rw-r--r--plugins/temporal/workflow/process.go436
1 files changed, 436 insertions, 0 deletions
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
new file mode 100644
index 00000000..45e6885c
--- /dev/null
+++ b/plugins/temporal/workflow/process.go
@@ -0,0 +1,436 @@
+package workflow
+
+import (
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
+ commonpb "go.temporal.io/api/common/v1"
+ bindings "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/workflow"
+)
+
+// wraps single workflow process
+type workflowProcess struct {
+ codec rrt.Codec
+ pool workflowPool
+ env bindings.WorkflowEnvironment
+ header *commonpb.Header
+ mq *messageQueue
+ ids *idRegistry
+ seqID uint64
+ runID string
+ pipeline []rrt.Message
+ callbacks []func() error
+ canceller *canceller
+ inLoop bool
+}
+
+// Execute workflow, bootstraps process.
+func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) {
+ wf.env = env
+ wf.header = header
+ wf.seqID = 0
+ wf.runID = env.WorkflowInfo().WorkflowExecution.RunID
+ wf.canceller = &canceller{}
+
+ // sequenceID shared for all worker workflows
+ wf.mq = newMessageQueue(wf.pool.SeqID)
+ wf.ids = newIDRegistry()
+
+ env.RegisterCancelHandler(wf.handleCancel)
+ env.RegisterSignalHandler(wf.handleSignal)
+ env.RegisterQueryHandler(wf.handleQuery)
+
+ var (
+ lastCompletion = bindings.GetLastCompletionResult(env)
+ lastCompletionOffset = 0
+ )
+
+ if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
+ if input == nil {
+ input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
+ }
+
+ input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
+ lastCompletionOffset = len(lastCompletion.Payloads)
+ }
+
+ _ = wf.mq.pushCommand(
+ rrt.StartWorkflow{
+ Info: env.WorkflowInfo(),
+ LastCompletion: lastCompletionOffset,
+ },
+ input,
+ )
+}
+
+// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
+func (wf *workflowProcess) OnWorkflowTaskStarted() {
+ wf.inLoop = true
+ defer func() { wf.inLoop = false }()
+
+ var err error
+ for _, callback := range wf.callbacks {
+ err = callback()
+ if err != nil {
+ panic(err)
+ }
+ }
+ wf.callbacks = nil
+
+ if err := wf.flushQueue(); err != nil {
+ panic(err)
+ }
+
+ for len(wf.pipeline) > 0 {
+ msg := wf.pipeline[0]
+ wf.pipeline = wf.pipeline[1:]
+
+ if msg.IsCommand() {
+ err = wf.handleMessage(msg)
+ }
+
+ if err != nil {
+ panic(err)
+ }
+ }
+}
+
+// StackTrace renders workflow stack trace.
+func (wf *workflowProcess) StackTrace() string {
+ result, err := wf.runCommand(
+ rrt.GetStackTrace{
+ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
+ },
+ nil,
+ )
+
+ if err != nil {
+ return err.Error()
+ }
+
+ var stacktrace string
+ err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace)
+ if err != nil {
+ return err.Error()
+ }
+
+ return stacktrace
+}
+
+// Close the workflow.
+func (wf *workflowProcess) Close() {
+ // TODO: properly handle errors
+ // panic(err)
+
+ _ = wf.mq.pushCommand(
+ rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
+ nil,
+ )
+
+ _, _ = wf.discardQueue()
+}
+
+// execution context.
+func (wf *workflowProcess) getContext() rrt.Context {
+ return rrt.Context{
+ TaskQueue: wf.env.WorkflowInfo().TaskQueueName,
+ TickTime: wf.env.Now().Format(time.RFC3339),
+ Replay: wf.env.IsReplaying(),
+ }
+}
+
+// schedule cancel command
+func (wf *workflowProcess) handleCancel() {
+ _ = wf.mq.pushCommand(
+ rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
+ nil,
+ )
+}
+
+// schedule the signal processing
+func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
+ _ = wf.mq.pushCommand(
+ rrt.InvokeSignal{
+ RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
+ Name: name,
+ },
+ input,
+ )
+}
+
+// Handle query in blocking mode.
+func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) {
+ result, err := wf.runCommand(
+ rrt.InvokeQuery{
+ RunID: wf.runID,
+ Name: queryType,
+ },
+ queryArgs,
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ if result.Failure != nil {
+ return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter())
+ }
+
+ return result.Payloads, nil
+}
+
+// process incoming command
+func (wf *workflowProcess) handleMessage(msg rrt.Message) error {
+ const op = errors.Op("handleMessage")
+ var err error
+
+ var (
+ id = msg.ID
+ cmd = msg.Command
+ payloads = msg.Payloads
+ )
+
+ switch cmd := cmd.(type) {
+ case *rrt.ExecuteActivity:
+ params := cmd.ActivityParams(wf.env, payloads)
+ activityID := wf.env.ExecuteActivity(params, wf.createCallback(id))
+
+ wf.canceller.register(id, func() error {
+ wf.env.RequestCancelActivity(activityID)
+ return nil
+ })
+
+ case *rrt.ExecuteChildWorkflow:
+ params := cmd.WorkflowParams(wf.env, payloads)
+
+ // always use deterministic id
+ if params.WorkflowID == "" {
+ nextID := atomic.AddUint64(&wf.seqID, 1)
+ params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID))
+ }
+
+ wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) {
+ wf.ids.push(id, r, e)
+ })
+
+ wf.canceller.register(id, func() error {
+ wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID)
+ return nil
+ })
+
+ case *rrt.GetChildWorkflowExecution:
+ wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) {
+ cl := wf.createCallback(id)
+
+ // TODO rewrite
+ if err != nil {
+ panic(err)
+ }
+
+ p, err := wf.env.GetDataConverter().ToPayloads(w)
+ if err != nil {
+ panic(err)
+ }
+
+ cl(p, err)
+ })
+
+ case *rrt.NewTimer:
+ timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id))
+ wf.canceller.register(id, func() error {
+ if timerID != nil {
+ wf.env.RequestCancelTimer(*timerID)
+ }
+ return nil
+ })
+
+ case *rrt.GetVersion:
+ version := wf.env.GetVersion(
+ cmd.ChangeID,
+ workflow.Version(cmd.MinSupported),
+ workflow.Version(cmd.MaxSupported),
+ )
+
+ result, err := wf.env.GetDataConverter().ToPayloads(version)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ wf.mq.pushResponse(id, result)
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+
+ case *rrt.SideEffect:
+ wf.env.SideEffect(
+ func() (*commonpb.Payloads, error) {
+ return payloads, nil
+ },
+ wf.createContinuableCallback(id),
+ )
+
+ case *rrt.CompleteWorkflow:
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ if msg.Failure == nil {
+ wf.env.Complete(payloads, nil)
+ } else {
+ wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter()))
+ }
+
+ case *rrt.ContinueAsNew:
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ wf.env.Complete(nil, &workflow.ContinueAsNewError{
+ WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
+ Input: payloads,
+ Header: wf.header,
+ TaskQueueName: cmd.Options.TaskQueueName,
+ WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout,
+ WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout,
+ WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout,
+ })
+
+ case *rrt.SignalExternalWorkflow:
+ wf.env.SignalExternalWorkflow(
+ cmd.Namespace,
+ cmd.WorkflowID,
+ cmd.RunID,
+ cmd.Signal,
+ payloads,
+ nil,
+ cmd.ChildWorkflowOnly,
+ wf.createCallback(id),
+ )
+
+ case *rrt.CancelExternalWorkflow:
+ wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id))
+
+ case *rrt.Cancel:
+ err = wf.canceller.cancel(cmd.CommandIDs...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ result, _ := wf.env.GetDataConverter().ToPayloads("completed")
+ wf.mq.pushResponse(id, result)
+
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+
+ case *rrt.Panic:
+ panic(errors.E(cmd.Message))
+
+ default:
+ panic("undefined command")
+ }
+
+ return nil
+}
+
+func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler {
+ callback := func(result *commonpb.Payloads, err error) error {
+ wf.canceller.discard(id)
+
+ if err != nil {
+ wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
+ return nil
+ }
+
+ // fetch original payload
+ wf.mq.pushResponse(id, result)
+ return nil
+ }
+
+ return func(result *commonpb.Payloads, err error) {
+ // timer cancel callback can happen inside the loop
+ if wf.inLoop {
+ err := callback(result, err)
+ if err != nil {
+ panic(err)
+ }
+
+ return
+ }
+
+ wf.callbacks = append(wf.callbacks, func() error {
+ return callback(result, err)
+ })
+ }
+}
+
+// callback to be called inside the queue processing, adds new messages at the end of the queue
+func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler {
+ callback := func(result *commonpb.Payloads, err error) {
+ wf.canceller.discard(id)
+
+ if err != nil {
+ wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
+ return
+ }
+
+ wf.mq.pushResponse(id, result)
+ err = wf.flushQueue()
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ return func(result *commonpb.Payloads, err error) {
+ callback(result, err)
+ }
+}
+
+// Exchange messages between host and worker processes and add new commands to the queue.
+func (wf *workflowProcess) flushQueue() error {
+ const op = errors.Op("flush queue")
+ messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
+ wf.mq.flush()
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ wf.pipeline = append(wf.pipeline, messages...)
+
+ return nil
+}
+
+// Exchange messages between host and worker processes without adding new commands to the queue.
+func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
+ const op = errors.Op("discard queue")
+ messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
+ wf.mq.flush()
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return messages, nil
+}
+
+// Run single command and return single result.
+func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
+ const op = errors.Op("workflow_process_runcommand")
+ _, msg := wf.mq.allocateMessage(cmd, payloads)
+
+ result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
+ if err != nil {
+ return rrt.Message{}, errors.E(op, err)
+ }
+
+ if len(result) != 1 {
+ return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
+ }
+
+ return result[0], nil
+}