diff options
Diffstat (limited to 'plugins/temporal/protocol/message.go')
-rw-r--r-- | plugins/temporal/protocol/message.go | 334 |
1 files changed, 334 insertions, 0 deletions
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go new file mode 100644 index 00000000..d5e0f49d --- /dev/null +++ b/plugins/temporal/protocol/message.go @@ -0,0 +1,334 @@ +package protocol + +import ( + "time" + + "github.com/spiral/errors" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/activity" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/workflow" +) + +const ( + getWorkerInfoCommand = "GetWorkerInfo" + + invokeActivityCommand = "InvokeActivity" + startWorkflowCommand = "StartWorkflow" + invokeSignalCommand = "InvokeSignal" + invokeQueryCommand = "InvokeQuery" + destroyWorkflowCommand = "DestroyWorkflow" + cancelWorkflowCommand = "CancelWorkflow" + getStackTraceCommand = "StackTrace" + + executeActivityCommand = "ExecuteActivity" + executeChildWorkflowCommand = "ExecuteChildWorkflow" + getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" + + newTimerCommand = "NewTimer" + sideEffectCommand = "SideEffect" + getVersionCommand = "GetVersion" + completeWorkflowCommand = "CompleteWorkflow" + continueAsNewCommand = "ContinueAsNew" + + signalExternalWorkflowCommand = "SignalExternalWorkflow" + cancelExternalWorkflowCommand = "CancelExternalWorkflow" + + cancelCommand = "Cancel" + panicCommand = "Panic" +) + +// GetWorkerInfo reads worker information. +type GetWorkerInfo struct{} + +// InvokeActivity invokes activity. +type InvokeActivity struct { + // Name defines activity name. + Name string `json:"name"` + + // Info contains execution context. + Info activity.Info `json:"info"` + + // HeartbeatDetails indicates that the payload also contains last heartbeat details. + HeartbeatDetails int `json:"heartbeatDetails,omitempty"` +} + +// StartWorkflow sends worker command to start workflow. +type StartWorkflow struct { + // Info to define workflow context. + Info *workflow.Info `json:"info"` + + // LastCompletion contains offset of last completion results. + LastCompletion int `json:"lastCompletion,omitempty"` +} + +// InvokeSignal invokes signal with a set of arguments. +type InvokeSignal struct { + // RunID workflow run id. + RunID string `json:"runId"` + + // Name of the signal. + Name string `json:"name"` +} + +// InvokeQuery invokes query with a set of arguments. +type InvokeQuery struct { + // RunID workflow run id. + RunID string `json:"runId"` + // Name of the query. + Name string `json:"name"` +} + +// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal). +type CancelWorkflow struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// DestroyWorkflow asks worker to offload workflow from memory. +type DestroyWorkflow struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// GetStackTrace asks worker to offload workflow from memory. +type GetStackTrace struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// ExecuteActivity command by workflow worker. +type ExecuteActivity struct { + // Name defines activity name. + Name string `json:"name"` + // Options to run activity. + Options bindings.ExecuteActivityOptions `json:"options,omitempty"` +} + +// ExecuteChildWorkflow executes child workflow. +type ExecuteChildWorkflow struct { + // Name defines workflow name. + Name string `json:"name"` + // Options to run activity. + Options bindings.WorkflowOptions `json:"options,omitempty"` +} + +// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow. +type GetChildWorkflowExecution struct { + // ID of child workflow command. + ID uint64 `json:"id"` +} + +// NewTimer starts new timer. +type NewTimer struct { + // Milliseconds defines timer duration. + Milliseconds int `json:"ms"` +} + +// SideEffect to be recorded into the history. +type SideEffect struct{} + +// GetVersion requests version marker. +type GetVersion struct { + ChangeID string `json:"changeID"` + MinSupported int `json:"minSupported"` + MaxSupported int `json:"maxSupported"` +} + +// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload. +type CompleteWorkflow struct{} + +// ContinueAsNew restarts workflow with new running instance. +type ContinueAsNew struct { + // Result defines workflow execution result. + Name string `json:"name"` + + // Options for continued as new workflow. + Options struct { + TaskQueueName string + WorkflowExecutionTimeout time.Duration + WorkflowRunTimeout time.Duration + WorkflowTaskTimeout time.Duration + } `json:"options"` +} + +// SignalExternalWorkflow sends signal to external workflow. +type SignalExternalWorkflow struct { + Namespace string `json:"namespace"` + WorkflowID string `json:"workflowID"` + RunID string `json:"runID"` + Signal string `json:"signal"` + ChildWorkflowOnly bool `json:"childWorkflowOnly"` +} + +// CancelExternalWorkflow canceller external workflow. +type CancelExternalWorkflow struct { + Namespace string `json:"namespace"` + WorkflowID string `json:"workflowID"` + RunID string `json:"runID"` +} + +// Cancel one or multiple internal promises (activities, local activities, timers, child workflows). +type Cancel struct { + // CommandIDs to be cancelled. + CommandIDs []uint64 `json:"ids"` +} + +// Panic triggers panic in workflow process. +type Panic struct { + // Message to include into the error. + Message string `json:"message"` +} + +// ActivityParams maps activity command to activity params. +func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams { + params := bindings.ExecuteActivityParams{ + ExecuteActivityOptions: cmd.Options, + ActivityType: bindings.ActivityType{Name: cmd.Name}, + Input: payloads, + } + + if params.TaskQueueName == "" { + params.TaskQueueName = env.WorkflowInfo().TaskQueueName + } + + return params +} + +// WorkflowParams maps workflow command to workflow params. +func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams { + params := bindings.ExecuteWorkflowParams{ + WorkflowOptions: cmd.Options, + WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, + Input: payloads, + } + + if params.TaskQueueName == "" { + params.TaskQueueName = env.WorkflowInfo().TaskQueueName + } + + return params +} + +// ToDuration converts timer command to time.Duration. +func (cmd NewTimer) ToDuration() time.Duration { + return time.Millisecond * time.Duration(cmd.Milliseconds) +} + +// returns command name (only for the commands sent to the worker) +func commandName(cmd interface{}) (string, error) { + const op = errors.Op("command_name") + switch cmd.(type) { + case GetWorkerInfo, *GetWorkerInfo: + return getWorkerInfoCommand, nil + case StartWorkflow, *StartWorkflow: + return startWorkflowCommand, nil + case InvokeSignal, *InvokeSignal: + return invokeSignalCommand, nil + case InvokeQuery, *InvokeQuery: + return invokeQueryCommand, nil + case DestroyWorkflow, *DestroyWorkflow: + return destroyWorkflowCommand, nil + case CancelWorkflow, *CancelWorkflow: + return cancelWorkflowCommand, nil + case GetStackTrace, *GetStackTrace: + return getStackTraceCommand, nil + case InvokeActivity, *InvokeActivity: + return invokeActivityCommand, nil + case ExecuteActivity, *ExecuteActivity: + return executeActivityCommand, nil + case ExecuteChildWorkflow, *ExecuteChildWorkflow: + return executeChildWorkflowCommand, nil + case GetChildWorkflowExecution, *GetChildWorkflowExecution: + return getChildWorkflowExecutionCommand, nil + case NewTimer, *NewTimer: + return newTimerCommand, nil + case GetVersion, *GetVersion: + return getVersionCommand, nil + case SideEffect, *SideEffect: + return sideEffectCommand, nil + case CompleteWorkflow, *CompleteWorkflow: + return completeWorkflowCommand, nil + case ContinueAsNew, *ContinueAsNew: + return continueAsNewCommand, nil + case SignalExternalWorkflow, *SignalExternalWorkflow: + return signalExternalWorkflowCommand, nil + case CancelExternalWorkflow, *CancelExternalWorkflow: + return cancelExternalWorkflowCommand, nil + case Cancel, *Cancel: + return cancelCommand, nil + case Panic, *Panic: + return panicCommand, nil + default: + return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd)) + } +} + +// reads command from binary payload +func initCommand(name string) (interface{}, error) { + const op = errors.Op("init_command") + switch name { + case getWorkerInfoCommand: + return &GetWorkerInfo{}, nil + + case startWorkflowCommand: + return &StartWorkflow{}, nil + + case invokeSignalCommand: + return &InvokeSignal{}, nil + + case invokeQueryCommand: + return &InvokeQuery{}, nil + + case destroyWorkflowCommand: + return &DestroyWorkflow{}, nil + + case cancelWorkflowCommand: + return &CancelWorkflow{}, nil + + case getStackTraceCommand: + return &GetStackTrace{}, nil + + case invokeActivityCommand: + return &InvokeActivity{}, nil + + case executeActivityCommand: + return &ExecuteActivity{}, nil + + case executeChildWorkflowCommand: + return &ExecuteChildWorkflow{}, nil + + case getChildWorkflowExecutionCommand: + return &GetChildWorkflowExecution{}, nil + + case newTimerCommand: + return &NewTimer{}, nil + + case getVersionCommand: + return &GetVersion{}, nil + + case sideEffectCommand: + return &SideEffect{}, nil + + case completeWorkflowCommand: + return &CompleteWorkflow{}, nil + + case continueAsNewCommand: + return &ContinueAsNew{}, nil + + case signalExternalWorkflowCommand: + return &SignalExternalWorkflow{}, nil + + case cancelExternalWorkflowCommand: + return &CancelExternalWorkflow{}, nil + + case cancelCommand: + return &Cancel{}, nil + + case panicCommand: + return &Panic{}, nil + + default: + return nil, errors.E(op, errors.Errorf("undefined command name: %s", name)) + } +} |