summaryrefslogtreecommitdiff
path: root/plugins/temporal/protocol/message.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/protocol/message.go')
-rw-r--r--plugins/temporal/protocol/message.go334
1 files changed, 0 insertions, 334 deletions
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
deleted file mode 100644
index d5e0f49d..00000000
--- a/plugins/temporal/protocol/message.go
+++ /dev/null
@@ -1,334 +0,0 @@
-package protocol
-
-import (
- "time"
-
- "github.com/spiral/errors"
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/activity"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/workflow"
-)
-
-const (
- getWorkerInfoCommand = "GetWorkerInfo"
-
- invokeActivityCommand = "InvokeActivity"
- startWorkflowCommand = "StartWorkflow"
- invokeSignalCommand = "InvokeSignal"
- invokeQueryCommand = "InvokeQuery"
- destroyWorkflowCommand = "DestroyWorkflow"
- cancelWorkflowCommand = "CancelWorkflow"
- getStackTraceCommand = "StackTrace"
-
- executeActivityCommand = "ExecuteActivity"
- executeChildWorkflowCommand = "ExecuteChildWorkflow"
- getChildWorkflowExecutionCommand = "GetChildWorkflowExecution"
-
- newTimerCommand = "NewTimer"
- sideEffectCommand = "SideEffect"
- getVersionCommand = "GetVersion"
- completeWorkflowCommand = "CompleteWorkflow"
- continueAsNewCommand = "ContinueAsNew"
-
- signalExternalWorkflowCommand = "SignalExternalWorkflow"
- cancelExternalWorkflowCommand = "CancelExternalWorkflow"
-
- cancelCommand = "Cancel"
- panicCommand = "Panic"
-)
-
-// GetWorkerInfo reads worker information.
-type GetWorkerInfo struct{}
-
-// InvokeActivity invokes activity.
-type InvokeActivity struct {
- // Name defines activity name.
- Name string `json:"name"`
-
- // Info contains execution context.
- Info activity.Info `json:"info"`
-
- // HeartbeatDetails indicates that the payload also contains last heartbeat details.
- HeartbeatDetails int `json:"heartbeatDetails,omitempty"`
-}
-
-// StartWorkflow sends worker command to start workflow.
-type StartWorkflow struct {
- // Info to define workflow context.
- Info *workflow.Info `json:"info"`
-
- // LastCompletion contains offset of last completion results.
- LastCompletion int `json:"lastCompletion,omitempty"`
-}
-
-// InvokeSignal invokes signal with a set of arguments.
-type InvokeSignal struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-
- // Name of the signal.
- Name string `json:"name"`
-}
-
-// InvokeQuery invokes query with a set of arguments.
-type InvokeQuery struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
- // Name of the query.
- Name string `json:"name"`
-}
-
-// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal).
-type CancelWorkflow struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// DestroyWorkflow asks worker to offload workflow from memory.
-type DestroyWorkflow struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// GetStackTrace asks worker to offload workflow from memory.
-type GetStackTrace struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// ExecuteActivity command by workflow worker.
-type ExecuteActivity struct {
- // Name defines activity name.
- Name string `json:"name"`
- // Options to run activity.
- Options bindings.ExecuteActivityOptions `json:"options,omitempty"`
-}
-
-// ExecuteChildWorkflow executes child workflow.
-type ExecuteChildWorkflow struct {
- // Name defines workflow name.
- Name string `json:"name"`
- // Options to run activity.
- Options bindings.WorkflowOptions `json:"options,omitempty"`
-}
-
-// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow.
-type GetChildWorkflowExecution struct {
- // ID of child workflow command.
- ID uint64 `json:"id"`
-}
-
-// NewTimer starts new timer.
-type NewTimer struct {
- // Milliseconds defines timer duration.
- Milliseconds int `json:"ms"`
-}
-
-// SideEffect to be recorded into the history.
-type SideEffect struct{}
-
-// GetVersion requests version marker.
-type GetVersion struct {
- ChangeID string `json:"changeID"`
- MinSupported int `json:"minSupported"`
- MaxSupported int `json:"maxSupported"`
-}
-
-// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload.
-type CompleteWorkflow struct{}
-
-// ContinueAsNew restarts workflow with new running instance.
-type ContinueAsNew struct {
- // Result defines workflow execution result.
- Name string `json:"name"`
-
- // Options for continued as new workflow.
- Options struct {
- TaskQueueName string
- WorkflowExecutionTimeout time.Duration
- WorkflowRunTimeout time.Duration
- WorkflowTaskTimeout time.Duration
- } `json:"options"`
-}
-
-// SignalExternalWorkflow sends signal to external workflow.
-type SignalExternalWorkflow struct {
- Namespace string `json:"namespace"`
- WorkflowID string `json:"workflowID"`
- RunID string `json:"runID"`
- Signal string `json:"signal"`
- ChildWorkflowOnly bool `json:"childWorkflowOnly"`
-}
-
-// CancelExternalWorkflow canceller external workflow.
-type CancelExternalWorkflow struct {
- Namespace string `json:"namespace"`
- WorkflowID string `json:"workflowID"`
- RunID string `json:"runID"`
-}
-
-// Cancel one or multiple internal promises (activities, local activities, timers, child workflows).
-type Cancel struct {
- // CommandIDs to be cancelled.
- CommandIDs []uint64 `json:"ids"`
-}
-
-// Panic triggers panic in workflow process.
-type Panic struct {
- // Message to include into the error.
- Message string `json:"message"`
-}
-
-// ActivityParams maps activity command to activity params.
-func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams {
- params := bindings.ExecuteActivityParams{
- ExecuteActivityOptions: cmd.Options,
- ActivityType: bindings.ActivityType{Name: cmd.Name},
- Input: payloads,
- }
-
- if params.TaskQueueName == "" {
- params.TaskQueueName = env.WorkflowInfo().TaskQueueName
- }
-
- return params
-}
-
-// WorkflowParams maps workflow command to workflow params.
-func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams {
- params := bindings.ExecuteWorkflowParams{
- WorkflowOptions: cmd.Options,
- WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
- Input: payloads,
- }
-
- if params.TaskQueueName == "" {
- params.TaskQueueName = env.WorkflowInfo().TaskQueueName
- }
-
- return params
-}
-
-// ToDuration converts timer command to time.Duration.
-func (cmd NewTimer) ToDuration() time.Duration {
- return time.Millisecond * time.Duration(cmd.Milliseconds)
-}
-
-// returns command name (only for the commands sent to the worker)
-func commandName(cmd interface{}) (string, error) {
- const op = errors.Op("command_name")
- switch cmd.(type) {
- case GetWorkerInfo, *GetWorkerInfo:
- return getWorkerInfoCommand, nil
- case StartWorkflow, *StartWorkflow:
- return startWorkflowCommand, nil
- case InvokeSignal, *InvokeSignal:
- return invokeSignalCommand, nil
- case InvokeQuery, *InvokeQuery:
- return invokeQueryCommand, nil
- case DestroyWorkflow, *DestroyWorkflow:
- return destroyWorkflowCommand, nil
- case CancelWorkflow, *CancelWorkflow:
- return cancelWorkflowCommand, nil
- case GetStackTrace, *GetStackTrace:
- return getStackTraceCommand, nil
- case InvokeActivity, *InvokeActivity:
- return invokeActivityCommand, nil
- case ExecuteActivity, *ExecuteActivity:
- return executeActivityCommand, nil
- case ExecuteChildWorkflow, *ExecuteChildWorkflow:
- return executeChildWorkflowCommand, nil
- case GetChildWorkflowExecution, *GetChildWorkflowExecution:
- return getChildWorkflowExecutionCommand, nil
- case NewTimer, *NewTimer:
- return newTimerCommand, nil
- case GetVersion, *GetVersion:
- return getVersionCommand, nil
- case SideEffect, *SideEffect:
- return sideEffectCommand, nil
- case CompleteWorkflow, *CompleteWorkflow:
- return completeWorkflowCommand, nil
- case ContinueAsNew, *ContinueAsNew:
- return continueAsNewCommand, nil
- case SignalExternalWorkflow, *SignalExternalWorkflow:
- return signalExternalWorkflowCommand, nil
- case CancelExternalWorkflow, *CancelExternalWorkflow:
- return cancelExternalWorkflowCommand, nil
- case Cancel, *Cancel:
- return cancelCommand, nil
- case Panic, *Panic:
- return panicCommand, nil
- default:
- return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd))
- }
-}
-
-// reads command from binary payload
-func initCommand(name string) (interface{}, error) {
- const op = errors.Op("init_command")
- switch name {
- case getWorkerInfoCommand:
- return &GetWorkerInfo{}, nil
-
- case startWorkflowCommand:
- return &StartWorkflow{}, nil
-
- case invokeSignalCommand:
- return &InvokeSignal{}, nil
-
- case invokeQueryCommand:
- return &InvokeQuery{}, nil
-
- case destroyWorkflowCommand:
- return &DestroyWorkflow{}, nil
-
- case cancelWorkflowCommand:
- return &CancelWorkflow{}, nil
-
- case getStackTraceCommand:
- return &GetStackTrace{}, nil
-
- case invokeActivityCommand:
- return &InvokeActivity{}, nil
-
- case executeActivityCommand:
- return &ExecuteActivity{}, nil
-
- case executeChildWorkflowCommand:
- return &ExecuteChildWorkflow{}, nil
-
- case getChildWorkflowExecutionCommand:
- return &GetChildWorkflowExecution{}, nil
-
- case newTimerCommand:
- return &NewTimer{}, nil
-
- case getVersionCommand:
- return &GetVersion{}, nil
-
- case sideEffectCommand:
- return &SideEffect{}, nil
-
- case completeWorkflowCommand:
- return &CompleteWorkflow{}, nil
-
- case continueAsNewCommand:
- return &ContinueAsNew{}, nil
-
- case signalExternalWorkflowCommand:
- return &SignalExternalWorkflow{}, nil
-
- case cancelExternalWorkflowCommand:
- return &CancelExternalWorkflow{}, nil
-
- case cancelCommand:
- return &Cancel{}, nil
-
- case panicCommand:
- return &Panic{}, nil
-
- default:
- return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
- }
-}