summaryrefslogtreecommitdiff
path: root/plugins/temporal/workflow/process.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/workflow/process.go')
-rw-r--r--plugins/temporal/workflow/process.go436
1 files changed, 0 insertions, 436 deletions
diff --git a/plugins/temporal/workflow/process.go b/plugins/temporal/workflow/process.go
deleted file mode 100644
index 45e6885c..00000000
--- a/plugins/temporal/workflow/process.go
+++ /dev/null
@@ -1,436 +0,0 @@
-package workflow
-
-import (
- "strconv"
- "sync/atomic"
- "time"
-
- "github.com/spiral/errors"
- rrt "github.com/spiral/roadrunner/v2/plugins/temporal/protocol"
- commonpb "go.temporal.io/api/common/v1"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/workflow"
-)
-
-// wraps single workflow process
-type workflowProcess struct {
- codec rrt.Codec
- pool workflowPool
- env bindings.WorkflowEnvironment
- header *commonpb.Header
- mq *messageQueue
- ids *idRegistry
- seqID uint64
- runID string
- pipeline []rrt.Message
- callbacks []func() error
- canceller *canceller
- inLoop bool
-}
-
-// Execute workflow, bootstraps process.
-func (wf *workflowProcess) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) {
- wf.env = env
- wf.header = header
- wf.seqID = 0
- wf.runID = env.WorkflowInfo().WorkflowExecution.RunID
- wf.canceller = &canceller{}
-
- // sequenceID shared for all worker workflows
- wf.mq = newMessageQueue(wf.pool.SeqID)
- wf.ids = newIDRegistry()
-
- env.RegisterCancelHandler(wf.handleCancel)
- env.RegisterSignalHandler(wf.handleSignal)
- env.RegisterQueryHandler(wf.handleQuery)
-
- var (
- lastCompletion = bindings.GetLastCompletionResult(env)
- lastCompletionOffset = 0
- )
-
- if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
- if input == nil {
- input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
- }
-
- input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
- lastCompletionOffset = len(lastCompletion.Payloads)
- }
-
- _ = wf.mq.pushCommand(
- rrt.StartWorkflow{
- Info: env.WorkflowInfo(),
- LastCompletion: lastCompletionOffset,
- },
- input,
- )
-}
-
-// OnWorkflowTaskStarted handles single workflow tick and batch of pipeline from temporal server.
-func (wf *workflowProcess) OnWorkflowTaskStarted() {
- wf.inLoop = true
- defer func() { wf.inLoop = false }()
-
- var err error
- for _, callback := range wf.callbacks {
- err = callback()
- if err != nil {
- panic(err)
- }
- }
- wf.callbacks = nil
-
- if err := wf.flushQueue(); err != nil {
- panic(err)
- }
-
- for len(wf.pipeline) > 0 {
- msg := wf.pipeline[0]
- wf.pipeline = wf.pipeline[1:]
-
- if msg.IsCommand() {
- err = wf.handleMessage(msg)
- }
-
- if err != nil {
- panic(err)
- }
- }
-}
-
-// StackTrace renders workflow stack trace.
-func (wf *workflowProcess) StackTrace() string {
- result, err := wf.runCommand(
- rrt.GetStackTrace{
- RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
- },
- nil,
- )
-
- if err != nil {
- return err.Error()
- }
-
- var stacktrace string
- err = wf.env.GetDataConverter().FromPayload(result.Payloads.Payloads[0], &stacktrace)
- if err != nil {
- return err.Error()
- }
-
- return stacktrace
-}
-
-// Close the workflow.
-func (wf *workflowProcess) Close() {
- // TODO: properly handle errors
- // panic(err)
-
- _ = wf.mq.pushCommand(
- rrt.DestroyWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
- nil,
- )
-
- _, _ = wf.discardQueue()
-}
-
-// execution context.
-func (wf *workflowProcess) getContext() rrt.Context {
- return rrt.Context{
- TaskQueue: wf.env.WorkflowInfo().TaskQueueName,
- TickTime: wf.env.Now().Format(time.RFC3339),
- Replay: wf.env.IsReplaying(),
- }
-}
-
-// schedule cancel command
-func (wf *workflowProcess) handleCancel() {
- _ = wf.mq.pushCommand(
- rrt.CancelWorkflow{RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID},
- nil,
- )
-}
-
-// schedule the signal processing
-func (wf *workflowProcess) handleSignal(name string, input *commonpb.Payloads) {
- _ = wf.mq.pushCommand(
- rrt.InvokeSignal{
- RunID: wf.env.WorkflowInfo().WorkflowExecution.RunID,
- Name: name,
- },
- input,
- )
-}
-
-// Handle query in blocking mode.
-func (wf *workflowProcess) handleQuery(queryType string, queryArgs *commonpb.Payloads) (*commonpb.Payloads, error) {
- result, err := wf.runCommand(
- rrt.InvokeQuery{
- RunID: wf.runID,
- Name: queryType,
- },
- queryArgs,
- )
-
- if err != nil {
- return nil, err
- }
-
- if result.Failure != nil {
- return nil, bindings.ConvertFailureToError(result.Failure, wf.env.GetDataConverter())
- }
-
- return result.Payloads, nil
-}
-
-// process incoming command
-func (wf *workflowProcess) handleMessage(msg rrt.Message) error {
- const op = errors.Op("handleMessage")
- var err error
-
- var (
- id = msg.ID
- cmd = msg.Command
- payloads = msg.Payloads
- )
-
- switch cmd := cmd.(type) {
- case *rrt.ExecuteActivity:
- params := cmd.ActivityParams(wf.env, payloads)
- activityID := wf.env.ExecuteActivity(params, wf.createCallback(id))
-
- wf.canceller.register(id, func() error {
- wf.env.RequestCancelActivity(activityID)
- return nil
- })
-
- case *rrt.ExecuteChildWorkflow:
- params := cmd.WorkflowParams(wf.env, payloads)
-
- // always use deterministic id
- if params.WorkflowID == "" {
- nextID := atomic.AddUint64(&wf.seqID, 1)
- params.WorkflowID = wf.env.WorkflowInfo().WorkflowExecution.RunID + "_" + strconv.Itoa(int(nextID))
- }
-
- wf.env.ExecuteChildWorkflow(params, wf.createCallback(id), func(r bindings.WorkflowExecution, e error) {
- wf.ids.push(id, r, e)
- })
-
- wf.canceller.register(id, func() error {
- wf.env.RequestCancelChildWorkflow(params.Namespace, params.WorkflowID)
- return nil
- })
-
- case *rrt.GetChildWorkflowExecution:
- wf.ids.listen(cmd.ID, func(w bindings.WorkflowExecution, err error) {
- cl := wf.createCallback(id)
-
- // TODO rewrite
- if err != nil {
- panic(err)
- }
-
- p, err := wf.env.GetDataConverter().ToPayloads(w)
- if err != nil {
- panic(err)
- }
-
- cl(p, err)
- })
-
- case *rrt.NewTimer:
- timerID := wf.env.NewTimer(cmd.ToDuration(), wf.createCallback(id))
- wf.canceller.register(id, func() error {
- if timerID != nil {
- wf.env.RequestCancelTimer(*timerID)
- }
- return nil
- })
-
- case *rrt.GetVersion:
- version := wf.env.GetVersion(
- cmd.ChangeID,
- workflow.Version(cmd.MinSupported),
- workflow.Version(cmd.MaxSupported),
- )
-
- result, err := wf.env.GetDataConverter().ToPayloads(version)
- if err != nil {
- return errors.E(op, err)
- }
-
- wf.mq.pushResponse(id, result)
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
-
- case *rrt.SideEffect:
- wf.env.SideEffect(
- func() (*commonpb.Payloads, error) {
- return payloads, nil
- },
- wf.createContinuableCallback(id),
- )
-
- case *rrt.CompleteWorkflow:
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- if msg.Failure == nil {
- wf.env.Complete(payloads, nil)
- } else {
- wf.env.Complete(nil, bindings.ConvertFailureToError(msg.Failure, wf.env.GetDataConverter()))
- }
-
- case *rrt.ContinueAsNew:
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- wf.env.Complete(nil, &workflow.ContinueAsNewError{
- WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
- Input: payloads,
- Header: wf.header,
- TaskQueueName: cmd.Options.TaskQueueName,
- WorkflowExecutionTimeout: cmd.Options.WorkflowExecutionTimeout,
- WorkflowRunTimeout: cmd.Options.WorkflowRunTimeout,
- WorkflowTaskTimeout: cmd.Options.WorkflowTaskTimeout,
- })
-
- case *rrt.SignalExternalWorkflow:
- wf.env.SignalExternalWorkflow(
- cmd.Namespace,
- cmd.WorkflowID,
- cmd.RunID,
- cmd.Signal,
- payloads,
- nil,
- cmd.ChildWorkflowOnly,
- wf.createCallback(id),
- )
-
- case *rrt.CancelExternalWorkflow:
- wf.env.RequestCancelExternalWorkflow(cmd.Namespace, cmd.WorkflowID, cmd.RunID, wf.createCallback(id))
-
- case *rrt.Cancel:
- err = wf.canceller.cancel(cmd.CommandIDs...)
- if err != nil {
- return errors.E(op, err)
- }
-
- result, _ := wf.env.GetDataConverter().ToPayloads("completed")
- wf.mq.pushResponse(id, result)
-
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
-
- case *rrt.Panic:
- panic(errors.E(cmd.Message))
-
- default:
- panic("undefined command")
- }
-
- return nil
-}
-
-func (wf *workflowProcess) createCallback(id uint64) bindings.ResultHandler {
- callback := func(result *commonpb.Payloads, err error) error {
- wf.canceller.discard(id)
-
- if err != nil {
- wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
- return nil
- }
-
- // fetch original payload
- wf.mq.pushResponse(id, result)
- return nil
- }
-
- return func(result *commonpb.Payloads, err error) {
- // timer cancel callback can happen inside the loop
- if wf.inLoop {
- err := callback(result, err)
- if err != nil {
- panic(err)
- }
-
- return
- }
-
- wf.callbacks = append(wf.callbacks, func() error {
- return callback(result, err)
- })
- }
-}
-
-// callback to be called inside the queue processing, adds new messages at the end of the queue
-func (wf *workflowProcess) createContinuableCallback(id uint64) bindings.ResultHandler {
- callback := func(result *commonpb.Payloads, err error) {
- wf.canceller.discard(id)
-
- if err != nil {
- wf.mq.pushError(id, bindings.ConvertErrorToFailure(err, wf.env.GetDataConverter()))
- return
- }
-
- wf.mq.pushResponse(id, result)
- err = wf.flushQueue()
- if err != nil {
- panic(err)
- }
- }
-
- return func(result *commonpb.Payloads, err error) {
- callback(result, err)
- }
-}
-
-// Exchange messages between host and worker processes and add new commands to the queue.
-func (wf *workflowProcess) flushQueue() error {
- const op = errors.Op("flush queue")
- messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
- wf.mq.flush()
-
- if err != nil {
- return errors.E(op, err)
- }
-
- wf.pipeline = append(wf.pipeline, messages...)
-
- return nil
-}
-
-// Exchange messages between host and worker processes without adding new commands to the queue.
-func (wf *workflowProcess) discardQueue() ([]rrt.Message, error) {
- const op = errors.Op("discard queue")
- messages, err := wf.codec.Execute(wf.pool, wf.getContext(), wf.mq.queue...)
- wf.mq.flush()
-
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return messages, nil
-}
-
-// Run single command and return single result.
-func (wf *workflowProcess) runCommand(cmd interface{}, payloads *commonpb.Payloads) (rrt.Message, error) {
- const op = errors.Op("workflow_process_runcommand")
- _, msg := wf.mq.allocateMessage(cmd, payloads)
-
- result, err := wf.codec.Execute(wf.pool, wf.getContext(), msg)
- if err != nil {
- return rrt.Message{}, errors.E(op, err)
- }
-
- if len(result) != 1 {
- return rrt.Message{}, errors.E(op, errors.Str("unexpected worker response"))
- }
-
- return result[0], nil
-}