diff options
Diffstat (limited to 'plugins/temporal/protocol')
-rw-r--r-- | plugins/temporal/protocol/converter.go | 76 | ||||
-rw-r--r-- | plugins/temporal/protocol/converter_test.go | 23 | ||||
-rw-r--r-- | plugins/temporal/protocol/internal/protocol.pb.go | 167 | ||||
-rw-r--r-- | plugins/temporal/protocol/json_codec.go | 225 | ||||
-rw-r--r-- | plugins/temporal/protocol/message.go | 334 | ||||
-rw-r--r-- | plugins/temporal/protocol/proto_codec.go | 145 | ||||
-rw-r--r-- | plugins/temporal/protocol/protocol.go | 77 | ||||
-rw-r--r-- | plugins/temporal/protocol/worker_info.go | 72 |
8 files changed, 0 insertions, 1119 deletions
diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go deleted file mode 100644 index 406e70f4..00000000 --- a/plugins/temporal/protocol/converter.go +++ /dev/null @@ -1,76 +0,0 @@ -package protocol - -import ( - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/converter" -) - -type ( - // DataConverter wraps Temporal data converter to enable direct access to the payloads. - DataConverter struct { - fallback converter.DataConverter - } -) - -// NewDataConverter creates new data converter. -func NewDataConverter(fallback converter.DataConverter) converter.DataConverter { - return &DataConverter{fallback: fallback} -} - -// ToPayloads converts a list of values. -func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { - for _, v := range values { - if aggregated, ok := v.(*commonpb.Payloads); ok { - // bypassing - return aggregated, nil - } - } - - return r.fallback.ToPayloads(values...) -} - -// ToPayload converts single value to payload. -func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { - return r.fallback.ToPayload(value) -} - -// FromPayloads converts to a list of values of different types. -// Useful for deserializing arguments of function invocations. -func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { - if payloads == nil { - return nil - } - - if len(valuePtrs) == 1 { - // input proxying - if input, ok := valuePtrs[0].(**commonpb.Payloads); ok { - *input = &commonpb.Payloads{} - (*input).Payloads = payloads.Payloads - return nil - } - } - - for i := 0; i < len(payloads.Payloads); i++ { - err := r.FromPayload(payloads.Payloads[i], valuePtrs[i]) - if err != nil { - return err - } - } - - return nil -} - -// FromPayload converts single value from payload. -func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { - return r.fallback.FromPayload(payload, valuePtr) -} - -// ToString converts payload object into human readable string. -func (r *DataConverter) ToString(input *commonpb.Payload) string { - return r.fallback.ToString(input) -} - -// ToStrings converts payloads object into human readable strings. -func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string { - return r.fallback.ToStrings(input) -} diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go deleted file mode 100644 index 6ce9fa0f..00000000 --- a/plugins/temporal/protocol/converter_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package protocol - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/converter" -) - -func Test_Passthough(t *testing.T) { - codec := NewDataConverter(converter.GetDefaultDataConverter()) - - value, err := codec.ToPayloads("test") - assert.NoError(t, err) - - out := &common.Payloads{} - - assert.Len(t, out.Payloads, 0) - assert.NoError(t, codec.FromPayloads(value, &out)) - - assert.Len(t, out.Payloads, 1) -} diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go deleted file mode 100644 index c554e28f..00000000 --- a/plugins/temporal/protocol/internal/protocol.pb.go +++ /dev/null @@ -1,167 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: protocol.proto - -package internal - -import ( - fmt "fmt" - math "math" - - proto "github.com/golang/protobuf/proto" - v11 "go.temporal.io/api/common/v1" - v1 "go.temporal.io/api/failure/v1" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type Frame struct { - Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Frame) Reset() { *m = Frame{} } -func (m *Frame) String() string { return proto.CompactTextString(m) } -func (*Frame) ProtoMessage() {} -func (*Frame) Descriptor() ([]byte, []int) { - return fileDescriptor_2bc2336598a3f7e0, []int{0} -} - -func (m *Frame) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Frame.Unmarshal(m, b) -} -func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Frame.Marshal(b, m, deterministic) -} -func (m *Frame) XXX_Merge(src proto.Message) { - xxx_messageInfo_Frame.Merge(m, src) -} -func (m *Frame) XXX_Size() int { - return xxx_messageInfo_Frame.Size(m) -} -func (m *Frame) XXX_DiscardUnknown() { - xxx_messageInfo_Frame.DiscardUnknown(m) -} - -var xxx_messageInfo_Frame proto.InternalMessageInfo - -func (m *Frame) GetMessages() []*Message { - if m != nil { - return m.Messages - } - return nil -} - -// Single communication message. -type Message struct { - Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - // command name (if any) - Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` - // command options in json format. - Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` - // error response. - Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"` - // invocation or result payloads. - Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Message) Reset() { *m = Message{} } -func (m *Message) String() string { return proto.CompactTextString(m) } -func (*Message) ProtoMessage() {} -func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_2bc2336598a3f7e0, []int{1} -} - -func (m *Message) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Message.Unmarshal(m, b) -} -func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Message.Marshal(b, m, deterministic) -} -func (m *Message) XXX_Merge(src proto.Message) { - xxx_messageInfo_Message.Merge(m, src) -} -func (m *Message) XXX_Size() int { - return xxx_messageInfo_Message.Size(m) -} -func (m *Message) XXX_DiscardUnknown() { - xxx_messageInfo_Message.DiscardUnknown(m) -} - -var xxx_messageInfo_Message proto.InternalMessageInfo - -func (m *Message) GetId() uint64 { - if m != nil { - return m.Id - } - return 0 -} - -func (m *Message) GetCommand() string { - if m != nil { - return m.Command - } - return "" -} - -func (m *Message) GetOptions() []byte { - if m != nil { - return m.Options - } - return nil -} - -func (m *Message) GetFailure() *v1.Failure { - if m != nil { - return m.Failure - } - return nil -} - -func (m *Message) GetPayloads() *v11.Payloads { - if m != nil { - return m.Payloads - } - return nil -} - -func init() { - proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame") - proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message") -} - -func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) } - -var fileDescriptor_2bc2336598a3f7e0 = []byte{ - // 257 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31, - 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec, - 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92, - 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef, - 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c, - 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56, - 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3, - 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47, - 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b, - 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2, - 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf, - 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a, - 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4, - 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43, - 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef, - 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00, - 0x00, -} diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go deleted file mode 100644 index e7a77068..00000000 --- a/plugins/temporal/protocol/json_codec.go +++ /dev/null @@ -1,225 +0,0 @@ -package protocol - -import ( - "github.com/fatih/color" - j "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -// JSONCodec can be used for debugging and log capturing reasons. -type JSONCodec struct { - // level enables verbose logging or all incoming and outcoming messages. - level DebugLevel - - // logger renders messages when debug enabled. - logger logger.Logger -} - -// jsonFrame contains message command in binary form. -type jsonFrame struct { - // ID contains ID of the command, response or error. - ID uint64 `json:"id"` - - // Command name. Optional. - Command string `json:"command,omitempty"` - - // Options to be unmarshalled to body (raw payload). - Options j.RawMessage `json:"options,omitempty"` - - // Failure associated with command id. - Failure []byte `json:"failure,omitempty"` - - // Payloads specific to the command or result. - Payloads []byte `json:"payloads,omitempty"` -} - -// NewJSONCodec creates new Json communication codec. -func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { - return &JSONCodec{ - level: level, - logger: logger, - } -} - -// WithLogger creates new codes instance with attached logger. -func (c *JSONCodec) WithLogger(logger logger.Logger) Codec { - return &JSONCodec{ - level: c.level, - logger: logger, - } -} - -// GetName returns codec name. -func (c *JSONCodec) GetName() string { - return "json" -} - -// Execute exchanges commands with worker. -func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { - const op = errors.Op("json_codec_execute") - if len(msg) == 0 { - return nil, nil - } - - var response = make([]jsonFrame, 0, 5) - var result = make([]Message, 0, 5) - var err error - - frames := make([]jsonFrame, 0, len(msg)) - for _, m := range msg { - frame, err := c.packFrame(m) - if err != nil { - return nil, errors.E(op, err) - } - - frames = append(frames, frame) - } - - p := payload.Payload{} - - if ctx.IsEmpty() { - p.Context = []byte("null") - } - - p.Context, err = json.Marshal(ctx) - if err != nil { - return nil, errors.E(op, err) - } - - p.Body, err = json.Marshal(frames) - if err != nil { - return nil, errors.E(op, err) - } - - if c.level >= DebugNormal { - logMessage := string(p.Body) + " " + string(p.Context) - if c.level >= DebugHumanized { - logMessage = color.GreenString(logMessage) - } - - c.logger.Debug(logMessage) - } - - out, err := e.Exec(p) - if err != nil { - return nil, errors.E(op, err) - } - - if len(out.Body) == 0 { - // worker inactive or closed - return nil, nil - } - - if c.level >= DebugNormal { - logMessage := string(out.Body) - if c.level >= DebugHumanized { - logMessage = color.HiYellowString(logMessage) - } - - c.logger.Debug(logMessage, "receive", true) - } - - err = json.Unmarshal(out.Body, &response) - if err != nil { - return nil, errors.E(op, err) - } - - for _, f := range response { - msg, err := c.parseFrame(f) - if err != nil { - return nil, errors.E(op, err) - } - - result = append(result, msg) - } - - return result, nil -} - -func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { - var ( - err error - frame jsonFrame - ) - - frame.ID = msg.ID - - if msg.Payloads != nil { - frame.Payloads, err = msg.Payloads.Marshal() - if err != nil { - return jsonFrame{}, err - } - } - - if msg.Failure != nil { - frame.Failure, err = msg.Failure.Marshal() - if err != nil { - return jsonFrame{}, err - } - } - - if msg.Command == nil { - return frame, nil - } - - frame.Command, err = commandName(msg.Command) - if err != nil { - return jsonFrame{}, err - } - - frame.Options, err = json.Marshal(msg.Command) - if err != nil { - return jsonFrame{}, err - } - - return frame, nil -} - -func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { - var ( - err error - msg Message - ) - - msg.ID = frame.ID - - if frame.Payloads != nil { - msg.Payloads = &common.Payloads{} - - err = msg.Payloads.Unmarshal(frame.Payloads) - if err != nil { - return Message{}, err - } - } - - if frame.Failure != nil { - msg.Failure = &failure.Failure{} - - err = msg.Failure.Unmarshal(frame.Failure) - if err != nil { - return Message{}, err - } - } - - if frame.Command != "" { - cmd, err := initCommand(frame.Command) - if err != nil { - return Message{}, err - } - - err = json.Unmarshal(frame.Options, &cmd) - if err != nil { - return Message{}, err - } - - msg.Command = cmd - } - - return msg, nil -} diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go deleted file mode 100644 index d5e0f49d..00000000 --- a/plugins/temporal/protocol/message.go +++ /dev/null @@ -1,334 +0,0 @@ -package protocol - -import ( - "time" - - "github.com/spiral/errors" - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/sdk/activity" - bindings "go.temporal.io/sdk/internalbindings" - "go.temporal.io/sdk/workflow" -) - -const ( - getWorkerInfoCommand = "GetWorkerInfo" - - invokeActivityCommand = "InvokeActivity" - startWorkflowCommand = "StartWorkflow" - invokeSignalCommand = "InvokeSignal" - invokeQueryCommand = "InvokeQuery" - destroyWorkflowCommand = "DestroyWorkflow" - cancelWorkflowCommand = "CancelWorkflow" - getStackTraceCommand = "StackTrace" - - executeActivityCommand = "ExecuteActivity" - executeChildWorkflowCommand = "ExecuteChildWorkflow" - getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" - - newTimerCommand = "NewTimer" - sideEffectCommand = "SideEffect" - getVersionCommand = "GetVersion" - completeWorkflowCommand = "CompleteWorkflow" - continueAsNewCommand = "ContinueAsNew" - - signalExternalWorkflowCommand = "SignalExternalWorkflow" - cancelExternalWorkflowCommand = "CancelExternalWorkflow" - - cancelCommand = "Cancel" - panicCommand = "Panic" -) - -// GetWorkerInfo reads worker information. -type GetWorkerInfo struct{} - -// InvokeActivity invokes activity. -type InvokeActivity struct { - // Name defines activity name. - Name string `json:"name"` - - // Info contains execution context. - Info activity.Info `json:"info"` - - // HeartbeatDetails indicates that the payload also contains last heartbeat details. - HeartbeatDetails int `json:"heartbeatDetails,omitempty"` -} - -// StartWorkflow sends worker command to start workflow. -type StartWorkflow struct { - // Info to define workflow context. - Info *workflow.Info `json:"info"` - - // LastCompletion contains offset of last completion results. - LastCompletion int `json:"lastCompletion,omitempty"` -} - -// InvokeSignal invokes signal with a set of arguments. -type InvokeSignal struct { - // RunID workflow run id. - RunID string `json:"runId"` - - // Name of the signal. - Name string `json:"name"` -} - -// InvokeQuery invokes query with a set of arguments. -type InvokeQuery struct { - // RunID workflow run id. - RunID string `json:"runId"` - // Name of the query. - Name string `json:"name"` -} - -// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal). -type CancelWorkflow struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// DestroyWorkflow asks worker to offload workflow from memory. -type DestroyWorkflow struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// GetStackTrace asks worker to offload workflow from memory. -type GetStackTrace struct { - // RunID workflow run id. - RunID string `json:"runId"` -} - -// ExecuteActivity command by workflow worker. -type ExecuteActivity struct { - // Name defines activity name. - Name string `json:"name"` - // Options to run activity. - Options bindings.ExecuteActivityOptions `json:"options,omitempty"` -} - -// ExecuteChildWorkflow executes child workflow. -type ExecuteChildWorkflow struct { - // Name defines workflow name. - Name string `json:"name"` - // Options to run activity. - Options bindings.WorkflowOptions `json:"options,omitempty"` -} - -// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow. -type GetChildWorkflowExecution struct { - // ID of child workflow command. - ID uint64 `json:"id"` -} - -// NewTimer starts new timer. -type NewTimer struct { - // Milliseconds defines timer duration. - Milliseconds int `json:"ms"` -} - -// SideEffect to be recorded into the history. -type SideEffect struct{} - -// GetVersion requests version marker. -type GetVersion struct { - ChangeID string `json:"changeID"` - MinSupported int `json:"minSupported"` - MaxSupported int `json:"maxSupported"` -} - -// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload. -type CompleteWorkflow struct{} - -// ContinueAsNew restarts workflow with new running instance. -type ContinueAsNew struct { - // Result defines workflow execution result. - Name string `json:"name"` - - // Options for continued as new workflow. - Options struct { - TaskQueueName string - WorkflowExecutionTimeout time.Duration - WorkflowRunTimeout time.Duration - WorkflowTaskTimeout time.Duration - } `json:"options"` -} - -// SignalExternalWorkflow sends signal to external workflow. -type SignalExternalWorkflow struct { - Namespace string `json:"namespace"` - WorkflowID string `json:"workflowID"` - RunID string `json:"runID"` - Signal string `json:"signal"` - ChildWorkflowOnly bool `json:"childWorkflowOnly"` -} - -// CancelExternalWorkflow canceller external workflow. -type CancelExternalWorkflow struct { - Namespace string `json:"namespace"` - WorkflowID string `json:"workflowID"` - RunID string `json:"runID"` -} - -// Cancel one or multiple internal promises (activities, local activities, timers, child workflows). -type Cancel struct { - // CommandIDs to be cancelled. - CommandIDs []uint64 `json:"ids"` -} - -// Panic triggers panic in workflow process. -type Panic struct { - // Message to include into the error. - Message string `json:"message"` -} - -// ActivityParams maps activity command to activity params. -func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams { - params := bindings.ExecuteActivityParams{ - ExecuteActivityOptions: cmd.Options, - ActivityType: bindings.ActivityType{Name: cmd.Name}, - Input: payloads, - } - - if params.TaskQueueName == "" { - params.TaskQueueName = env.WorkflowInfo().TaskQueueName - } - - return params -} - -// WorkflowParams maps workflow command to workflow params. -func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams { - params := bindings.ExecuteWorkflowParams{ - WorkflowOptions: cmd.Options, - WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, - Input: payloads, - } - - if params.TaskQueueName == "" { - params.TaskQueueName = env.WorkflowInfo().TaskQueueName - } - - return params -} - -// ToDuration converts timer command to time.Duration. -func (cmd NewTimer) ToDuration() time.Duration { - return time.Millisecond * time.Duration(cmd.Milliseconds) -} - -// returns command name (only for the commands sent to the worker) -func commandName(cmd interface{}) (string, error) { - const op = errors.Op("command_name") - switch cmd.(type) { - case GetWorkerInfo, *GetWorkerInfo: - return getWorkerInfoCommand, nil - case StartWorkflow, *StartWorkflow: - return startWorkflowCommand, nil - case InvokeSignal, *InvokeSignal: - return invokeSignalCommand, nil - case InvokeQuery, *InvokeQuery: - return invokeQueryCommand, nil - case DestroyWorkflow, *DestroyWorkflow: - return destroyWorkflowCommand, nil - case CancelWorkflow, *CancelWorkflow: - return cancelWorkflowCommand, nil - case GetStackTrace, *GetStackTrace: - return getStackTraceCommand, nil - case InvokeActivity, *InvokeActivity: - return invokeActivityCommand, nil - case ExecuteActivity, *ExecuteActivity: - return executeActivityCommand, nil - case ExecuteChildWorkflow, *ExecuteChildWorkflow: - return executeChildWorkflowCommand, nil - case GetChildWorkflowExecution, *GetChildWorkflowExecution: - return getChildWorkflowExecutionCommand, nil - case NewTimer, *NewTimer: - return newTimerCommand, nil - case GetVersion, *GetVersion: - return getVersionCommand, nil - case SideEffect, *SideEffect: - return sideEffectCommand, nil - case CompleteWorkflow, *CompleteWorkflow: - return completeWorkflowCommand, nil - case ContinueAsNew, *ContinueAsNew: - return continueAsNewCommand, nil - case SignalExternalWorkflow, *SignalExternalWorkflow: - return signalExternalWorkflowCommand, nil - case CancelExternalWorkflow, *CancelExternalWorkflow: - return cancelExternalWorkflowCommand, nil - case Cancel, *Cancel: - return cancelCommand, nil - case Panic, *Panic: - return panicCommand, nil - default: - return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd)) - } -} - -// reads command from binary payload -func initCommand(name string) (interface{}, error) { - const op = errors.Op("init_command") - switch name { - case getWorkerInfoCommand: - return &GetWorkerInfo{}, nil - - case startWorkflowCommand: - return &StartWorkflow{}, nil - - case invokeSignalCommand: - return &InvokeSignal{}, nil - - case invokeQueryCommand: - return &InvokeQuery{}, nil - - case destroyWorkflowCommand: - return &DestroyWorkflow{}, nil - - case cancelWorkflowCommand: - return &CancelWorkflow{}, nil - - case getStackTraceCommand: - return &GetStackTrace{}, nil - - case invokeActivityCommand: - return &InvokeActivity{}, nil - - case executeActivityCommand: - return &ExecuteActivity{}, nil - - case executeChildWorkflowCommand: - return &ExecuteChildWorkflow{}, nil - - case getChildWorkflowExecutionCommand: - return &GetChildWorkflowExecution{}, nil - - case newTimerCommand: - return &NewTimer{}, nil - - case getVersionCommand: - return &GetVersion{}, nil - - case sideEffectCommand: - return &SideEffect{}, nil - - case completeWorkflowCommand: - return &CompleteWorkflow{}, nil - - case continueAsNewCommand: - return &ContinueAsNew{}, nil - - case signalExternalWorkflowCommand: - return &SignalExternalWorkflow{}, nil - - case cancelExternalWorkflowCommand: - return &CancelExternalWorkflow{}, nil - - case cancelCommand: - return &Cancel{}, nil - - case panicCommand: - return &Panic{}, nil - - default: - return nil, errors.E(op, errors.Errorf("undefined command name: %s", name)) - } -} diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go deleted file mode 100644 index 607fe0fe..00000000 --- a/plugins/temporal/protocol/proto_codec.go +++ /dev/null @@ -1,145 +0,0 @@ -package protocol - -import ( - v1 "github.com/golang/protobuf/proto" //nolint:staticcheck - jsoniter "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal" - "google.golang.org/protobuf/proto" -) - -type ( - // ProtoCodec uses protobuf to exchange messages with underlying workers. - ProtoCodec struct { - } -) - -// NewProtoCodec creates new Proto communication codec. -func NewProtoCodec() Codec { - return &ProtoCodec{} -} - -// WithLogger creates new codes instance with attached logger. -func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec { - return &ProtoCodec{} -} - -// GetName returns codec name. -func (c *ProtoCodec) GetName() string { - return "protobuf" -} - -// Execute exchanges commands with worker. -func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { - if len(msg) == 0 { - return nil, nil - } - - var request = &internal.Frame{} - var response = &internal.Frame{} - var result = make([]Message, 0, 5) - var err error - - for _, m := range msg { - frame, err := c.packMessage(m) - if err != nil { - return nil, err - } - - request.Messages = append(request.Messages, frame) - } - - p := payload.Payload{} - - // context is always in json format - if ctx.IsEmpty() { - p.Context = []byte("null") - } - - p.Context, err = jsoniter.Marshal(ctx) - if err != nil { - return nil, errors.E(errors.Op("encodeContext"), err) - } - - p.Body, err = proto.Marshal(v1.MessageV2(request)) - if err != nil { - return nil, errors.E(errors.Op("encodePayload"), err) - } - - out, err := e.Exec(p) - if err != nil { - return nil, errors.E(errors.Op("execute"), err) - } - - if len(out.Body) == 0 { - // worker inactive or closed - return nil, nil - } - - err = proto.Unmarshal(out.Body, v1.MessageV2(response)) - if err != nil { - return nil, errors.E(errors.Op("parseResponse"), err) - } - - for _, f := range response.Messages { - msg, err := c.parseMessage(f) - if err != nil { - return nil, err - } - - result = append(result, msg) - } - - return result, nil -} - -func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) { - var err error - - frame := &internal.Message{ - Id: msg.ID, - Payloads: msg.Payloads, - Failure: msg.Failure, - } - - if msg.Command != nil { - frame.Command, err = commandName(msg.Command) - if err != nil { - return nil, err - } - - frame.Options, err = jsoniter.Marshal(msg.Command) - if err != nil { - return nil, err - } - } - - return frame, nil -} - -func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) { - const op = errors.Op("proto_codec_parse_message") - var err error - - msg := Message{ - ID: frame.Id, - Payloads: frame.Payloads, - Failure: frame.Failure, - } - - if frame.Command != "" { - msg.Command, err = initCommand(frame.Command) - if err != nil { - return Message{}, errors.E(op, err) - } - - err = jsoniter.Unmarshal(frame.Options, &msg.Command) - if err != nil { - return Message{}, errors.E(op, err) - } - } - - return msg, nil -} diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go deleted file mode 100644 index 53076fdf..00000000 --- a/plugins/temporal/protocol/protocol.go +++ /dev/null @@ -1,77 +0,0 @@ -package protocol - -import ( - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -const ( - // DebugNone disables all debug messages. - DebugNone = iota - - // DebugNormal renders all messages into console. - DebugNormal - - // DebugHumanized enables color highlights for messages. - DebugHumanized -) - -// Context provides worker information about currently. Context can be empty for server level commands. -type Context struct { - // TaskQueue associates message batch with the specific task queue in underlying worker. - TaskQueue string `json:"taskQueue,omitempty"` - - // TickTime associated current or historical time with message batch. - TickTime string `json:"tickTime,omitempty"` - - // Replay indicates that current message batch is historical. - Replay bool `json:"replay,omitempty"` -} - -// Message used to exchange the send commands and receive responses from underlying workers. -type Message struct { - // ID contains ID of the command, response or error. - ID uint64 `json:"id"` - - // Command of the message in unmarshalled form. Pointer. - Command interface{} `json:"command,omitempty"` - - // Failure associated with command id. - Failure *failure.Failure `json:"failure,omitempty"` - - // Payloads contains message specific payloads in binary format. - Payloads *commonpb.Payloads `json:"payloads,omitempty"` -} - -// Codec manages payload encoding and decoding while communication with underlying worker. -type Codec interface { - // WithLogger creates new codes instance with attached logger. - WithLogger(logger.Logger) Codec - - // GetName returns codec name. - GetName() string - - // Execute sends message to worker and waits for the response. - Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) -} - -// Endpoint provides the ability to send and receive messages. -type Endpoint interface { - // ExecWithContext allow to set ExecTTL - Exec(p payload.Payload) (payload.Payload, error) -} - -// DebugLevel configures debug level. -type DebugLevel int - -// IsEmpty only check if task queue set. -func (ctx Context) IsEmpty() bool { - return ctx.TaskQueue == "" -} - -// IsCommand returns true if message carries request. -func (msg Message) IsCommand() bool { - return msg.Command != nil -} diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go deleted file mode 100644 index 58a0ae66..00000000 --- a/plugins/temporal/protocol/worker_info.go +++ /dev/null @@ -1,72 +0,0 @@ -package protocol - -import ( - "github.com/spiral/errors" - "go.temporal.io/sdk/converter" - "go.temporal.io/sdk/worker" -) - -// WorkerInfo outlines information about every available worker and it's TaskQueues. - -// WorkerInfo lists available task queues, workflows and activities. -type WorkerInfo struct { - // TaskQueue assigned to the worker. - TaskQueue string `json:"taskQueue"` - - // Options describe worker options. - Options worker.Options `json:"options,omitempty"` - - // Workflows provided by the worker. - Workflows []WorkflowInfo - - // Activities provided by the worker. - Activities []ActivityInfo -} - -// WorkflowInfo describes single worker workflow. -type WorkflowInfo struct { - // Name of the workflow. - Name string `json:"name"` - - // Queries pre-defined for the workflow type. - Queries []string `json:"queries"` - - // Signals pre-defined for the workflow type. - Signals []string `json:"signals"` -} - -// ActivityInfo describes single worker activity. -type ActivityInfo struct { - // Name describes public activity name. - Name string `json:"name"` -} - -// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process). -func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) { - const op = errors.Op("fetch_worker_info") - - result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}}) - if err != nil { - return nil, errors.E(op, err) - } - - if len(result) != 1 { - return nil, errors.E(op, errors.Str("unable to read worker info")) - } - - if result[0].ID != 0 { - return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing")) - } - - var info []WorkerInfo - for i := range result[0].Payloads.Payloads { - wi := WorkerInfo{} - if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil { - return nil, errors.E(op, err) - } - - info = append(info, wi) - } - - return info, nil -} |