diff options
Diffstat (limited to 'plugins/temporal/protocol')
-rw-r--r-- | plugins/temporal/protocol/converter.go | 76 | ||||
-rw-r--r-- | plugins/temporal/protocol/converter_test.go | 23 | ||||
-rw-r--r-- | plugins/temporal/protocol/internal/protocol.pb.go | 167 | ||||
-rw-r--r-- | plugins/temporal/protocol/json_codec.go | 225 | ||||
-rw-r--r-- | plugins/temporal/protocol/message.go | 334 | ||||
-rw-r--r-- | plugins/temporal/protocol/proto_codec.go | 145 | ||||
-rw-r--r-- | plugins/temporal/protocol/protocol.go | 77 | ||||
-rw-r--r-- | plugins/temporal/protocol/worker_info.go | 72 |
8 files changed, 1119 insertions, 0 deletions
diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go new file mode 100644 index 00000000..406e70f4 --- /dev/null +++ b/plugins/temporal/protocol/converter.go @@ -0,0 +1,76 @@ +package protocol + +import ( + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" +) + +type ( + // DataConverter wraps Temporal data converter to enable direct access to the payloads. + DataConverter struct { + fallback converter.DataConverter + } +) + +// NewDataConverter creates new data converter. +func NewDataConverter(fallback converter.DataConverter) converter.DataConverter { + return &DataConverter{fallback: fallback} +} + +// ToPayloads converts a list of values. +func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { + for _, v := range values { + if aggregated, ok := v.(*commonpb.Payloads); ok { + // bypassing + return aggregated, nil + } + } + + return r.fallback.ToPayloads(values...) +} + +// ToPayload converts single value to payload. +func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { + return r.fallback.ToPayload(value) +} + +// FromPayloads converts to a list of values of different types. +// Useful for deserializing arguments of function invocations. +func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { + if payloads == nil { + return nil + } + + if len(valuePtrs) == 1 { + // input proxying + if input, ok := valuePtrs[0].(**commonpb.Payloads); ok { + *input = &commonpb.Payloads{} + (*input).Payloads = payloads.Payloads + return nil + } + } + + for i := 0; i < len(payloads.Payloads); i++ { + err := r.FromPayload(payloads.Payloads[i], valuePtrs[i]) + if err != nil { + return err + } + } + + return nil +} + +// FromPayload converts single value from payload. +func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { + return r.fallback.FromPayload(payload, valuePtr) +} + +// ToString converts payload object into human readable string. +func (r *DataConverter) ToString(input *commonpb.Payload) string { + return r.fallback.ToString(input) +} + +// ToStrings converts payloads object into human readable strings. +func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string { + return r.fallback.ToStrings(input) +} diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go new file mode 100644 index 00000000..6ce9fa0f --- /dev/null +++ b/plugins/temporal/protocol/converter_test.go @@ -0,0 +1,23 @@ +package protocol + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" +) + +func Test_Passthough(t *testing.T) { + codec := NewDataConverter(converter.GetDefaultDataConverter()) + + value, err := codec.ToPayloads("test") + assert.NoError(t, err) + + out := &common.Payloads{} + + assert.Len(t, out.Payloads, 0) + assert.NoError(t, codec.FromPayloads(value, &out)) + + assert.Len(t, out.Payloads, 1) +} diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go new file mode 100644 index 00000000..c554e28f --- /dev/null +++ b/plugins/temporal/protocol/internal/protocol.pb.go @@ -0,0 +1,167 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: protocol.proto + +package internal + +import ( + fmt "fmt" + math "math" + + proto "github.com/golang/protobuf/proto" + v11 "go.temporal.io/api/common/v1" + v1 "go.temporal.io/api/failure/v1" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Frame struct { + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Frame) Reset() { *m = Frame{} } +func (m *Frame) String() string { return proto.CompactTextString(m) } +func (*Frame) ProtoMessage() {} +func (*Frame) Descriptor() ([]byte, []int) { + return fileDescriptor_2bc2336598a3f7e0, []int{0} +} + +func (m *Frame) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Frame.Unmarshal(m, b) +} +func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Frame.Marshal(b, m, deterministic) +} +func (m *Frame) XXX_Merge(src proto.Message) { + xxx_messageInfo_Frame.Merge(m, src) +} +func (m *Frame) XXX_Size() int { + return xxx_messageInfo_Frame.Size(m) +} +func (m *Frame) XXX_DiscardUnknown() { + xxx_messageInfo_Frame.DiscardUnknown(m) +} + +var xxx_messageInfo_Frame proto.InternalMessageInfo + +func (m *Frame) GetMessages() []*Message { + if m != nil { + return m.Messages + } + return nil +} + +// Single communication message. +type Message struct { + Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + // command name (if any) + Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` + // command options in json format. + Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` + // error response. + Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"` + // invocation or result payloads. + Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_2bc2336598a3f7e0, []int{1} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetId() uint64 { + if m != nil { + return m.Id + } + return 0 +} + +func (m *Message) GetCommand() string { + if m != nil { + return m.Command + } + return "" +} + +func (m *Message) GetOptions() []byte { + if m != nil { + return m.Options + } + return nil +} + +func (m *Message) GetFailure() *v1.Failure { + if m != nil { + return m.Failure + } + return nil +} + +func (m *Message) GetPayloads() *v11.Payloads { + if m != nil { + return m.Payloads + } + return nil +} + +func init() { + proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame") + proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message") +} + +func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) } + +var fileDescriptor_2bc2336598a3f7e0 = []byte{ + // 257 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31, + 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec, + 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92, + 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef, + 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c, + 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56, + 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3, + 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47, + 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b, + 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2, + 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf, + 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a, + 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4, + 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43, + 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef, + 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00, + 0x00, +} diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go new file mode 100644 index 00000000..e7a77068 --- /dev/null +++ b/plugins/temporal/protocol/json_codec.go @@ -0,0 +1,225 @@ +package protocol + +import ( + "github.com/fatih/color" + j "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +var json = j.ConfigCompatibleWithStandardLibrary + +// JSONCodec can be used for debugging and log capturing reasons. +type JSONCodec struct { + // level enables verbose logging or all incoming and outcoming messages. + level DebugLevel + + // logger renders messages when debug enabled. + logger logger.Logger +} + +// jsonFrame contains message command in binary form. +type jsonFrame struct { + // ID contains ID of the command, response or error. + ID uint64 `json:"id"` + + // Command name. Optional. + Command string `json:"command,omitempty"` + + // Options to be unmarshalled to body (raw payload). + Options j.RawMessage `json:"options,omitempty"` + + // Failure associated with command id. + Failure []byte `json:"failure,omitempty"` + + // Payloads specific to the command or result. + Payloads []byte `json:"payloads,omitempty"` +} + +// NewJSONCodec creates new Json communication codec. +func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { + return &JSONCodec{ + level: level, + logger: logger, + } +} + +// WithLogger creates new codes instance with attached logger. +func (c *JSONCodec) WithLogger(logger logger.Logger) Codec { + return &JSONCodec{ + level: c.level, + logger: logger, + } +} + +// GetName returns codec name. +func (c *JSONCodec) GetName() string { + return "json" +} + +// Execute exchanges commands with worker. +func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { + const op = errors.Op("json_codec_execute") + if len(msg) == 0 { + return nil, nil + } + + var response = make([]jsonFrame, 0, 5) + var result = make([]Message, 0, 5) + var err error + + frames := make([]jsonFrame, 0, len(msg)) + for _, m := range msg { + frame, err := c.packFrame(m) + if err != nil { + return nil, errors.E(op, err) + } + + frames = append(frames, frame) + } + + p := payload.Payload{} + + if ctx.IsEmpty() { + p.Context = []byte("null") + } + + p.Context, err = json.Marshal(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + p.Body, err = json.Marshal(frames) + if err != nil { + return nil, errors.E(op, err) + } + + if c.level >= DebugNormal { + logMessage := string(p.Body) + " " + string(p.Context) + if c.level >= DebugHumanized { + logMessage = color.GreenString(logMessage) + } + + c.logger.Debug(logMessage) + } + + out, err := e.Exec(p) + if err != nil { + return nil, errors.E(op, err) + } + + if len(out.Body) == 0 { + // worker inactive or closed + return nil, nil + } + + if c.level >= DebugNormal { + logMessage := string(out.Body) + if c.level >= DebugHumanized { + logMessage = color.HiYellowString(logMessage) + } + + c.logger.Debug(logMessage, "receive", true) + } + + err = json.Unmarshal(out.Body, &response) + if err != nil { + return nil, errors.E(op, err) + } + + for _, f := range response { + msg, err := c.parseFrame(f) + if err != nil { + return nil, errors.E(op, err) + } + + result = append(result, msg) + } + + return result, nil +} + +func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { + var ( + err error + frame jsonFrame + ) + + frame.ID = msg.ID + + if msg.Payloads != nil { + frame.Payloads, err = msg.Payloads.Marshal() + if err != nil { + return jsonFrame{}, err + } + } + + if msg.Failure != nil { + frame.Failure, err = msg.Failure.Marshal() + if err != nil { + return jsonFrame{}, err + } + } + + if msg.Command == nil { + return frame, nil + } + + frame.Command, err = commandName(msg.Command) + if err != nil { + return jsonFrame{}, err + } + + frame.Options, err = json.Marshal(msg.Command) + if err != nil { + return jsonFrame{}, err + } + + return frame, nil +} + +func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { + var ( + err error + msg Message + ) + + msg.ID = frame.ID + + if frame.Payloads != nil { + msg.Payloads = &common.Payloads{} + + err = msg.Payloads.Unmarshal(frame.Payloads) + if err != nil { + return Message{}, err + } + } + + if frame.Failure != nil { + msg.Failure = &failure.Failure{} + + err = msg.Failure.Unmarshal(frame.Failure) + if err != nil { + return Message{}, err + } + } + + if frame.Command != "" { + cmd, err := initCommand(frame.Command) + if err != nil { + return Message{}, err + } + + err = json.Unmarshal(frame.Options, &cmd) + if err != nil { + return Message{}, err + } + + msg.Command = cmd + } + + return msg, nil +} diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go new file mode 100644 index 00000000..d5e0f49d --- /dev/null +++ b/plugins/temporal/protocol/message.go @@ -0,0 +1,334 @@ +package protocol + +import ( + "time" + + "github.com/spiral/errors" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/activity" + bindings "go.temporal.io/sdk/internalbindings" + "go.temporal.io/sdk/workflow" +) + +const ( + getWorkerInfoCommand = "GetWorkerInfo" + + invokeActivityCommand = "InvokeActivity" + startWorkflowCommand = "StartWorkflow" + invokeSignalCommand = "InvokeSignal" + invokeQueryCommand = "InvokeQuery" + destroyWorkflowCommand = "DestroyWorkflow" + cancelWorkflowCommand = "CancelWorkflow" + getStackTraceCommand = "StackTrace" + + executeActivityCommand = "ExecuteActivity" + executeChildWorkflowCommand = "ExecuteChildWorkflow" + getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" + + newTimerCommand = "NewTimer" + sideEffectCommand = "SideEffect" + getVersionCommand = "GetVersion" + completeWorkflowCommand = "CompleteWorkflow" + continueAsNewCommand = "ContinueAsNew" + + signalExternalWorkflowCommand = "SignalExternalWorkflow" + cancelExternalWorkflowCommand = "CancelExternalWorkflow" + + cancelCommand = "Cancel" + panicCommand = "Panic" +) + +// GetWorkerInfo reads worker information. +type GetWorkerInfo struct{} + +// InvokeActivity invokes activity. +type InvokeActivity struct { + // Name defines activity name. + Name string `json:"name"` + + // Info contains execution context. + Info activity.Info `json:"info"` + + // HeartbeatDetails indicates that the payload also contains last heartbeat details. + HeartbeatDetails int `json:"heartbeatDetails,omitempty"` +} + +// StartWorkflow sends worker command to start workflow. +type StartWorkflow struct { + // Info to define workflow context. + Info *workflow.Info `json:"info"` + + // LastCompletion contains offset of last completion results. + LastCompletion int `json:"lastCompletion,omitempty"` +} + +// InvokeSignal invokes signal with a set of arguments. +type InvokeSignal struct { + // RunID workflow run id. + RunID string `json:"runId"` + + // Name of the signal. + Name string `json:"name"` +} + +// InvokeQuery invokes query with a set of arguments. +type InvokeQuery struct { + // RunID workflow run id. + RunID string `json:"runId"` + // Name of the query. + Name string `json:"name"` +} + +// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal). +type CancelWorkflow struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// DestroyWorkflow asks worker to offload workflow from memory. +type DestroyWorkflow struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// GetStackTrace asks worker to offload workflow from memory. +type GetStackTrace struct { + // RunID workflow run id. + RunID string `json:"runId"` +} + +// ExecuteActivity command by workflow worker. +type ExecuteActivity struct { + // Name defines activity name. + Name string `json:"name"` + // Options to run activity. + Options bindings.ExecuteActivityOptions `json:"options,omitempty"` +} + +// ExecuteChildWorkflow executes child workflow. +type ExecuteChildWorkflow struct { + // Name defines workflow name. + Name string `json:"name"` + // Options to run activity. + Options bindings.WorkflowOptions `json:"options,omitempty"` +} + +// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow. +type GetChildWorkflowExecution struct { + // ID of child workflow command. + ID uint64 `json:"id"` +} + +// NewTimer starts new timer. +type NewTimer struct { + // Milliseconds defines timer duration. + Milliseconds int `json:"ms"` +} + +// SideEffect to be recorded into the history. +type SideEffect struct{} + +// GetVersion requests version marker. +type GetVersion struct { + ChangeID string `json:"changeID"` + MinSupported int `json:"minSupported"` + MaxSupported int `json:"maxSupported"` +} + +// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload. +type CompleteWorkflow struct{} + +// ContinueAsNew restarts workflow with new running instance. +type ContinueAsNew struct { + // Result defines workflow execution result. + Name string `json:"name"` + + // Options for continued as new workflow. + Options struct { + TaskQueueName string + WorkflowExecutionTimeout time.Duration + WorkflowRunTimeout time.Duration + WorkflowTaskTimeout time.Duration + } `json:"options"` +} + +// SignalExternalWorkflow sends signal to external workflow. +type SignalExternalWorkflow struct { + Namespace string `json:"namespace"` + WorkflowID string `json:"workflowID"` + RunID string `json:"runID"` + Signal string `json:"signal"` + ChildWorkflowOnly bool `json:"childWorkflowOnly"` +} + +// CancelExternalWorkflow canceller external workflow. +type CancelExternalWorkflow struct { + Namespace string `json:"namespace"` + WorkflowID string `json:"workflowID"` + RunID string `json:"runID"` +} + +// Cancel one or multiple internal promises (activities, local activities, timers, child workflows). +type Cancel struct { + // CommandIDs to be cancelled. + CommandIDs []uint64 `json:"ids"` +} + +// Panic triggers panic in workflow process. +type Panic struct { + // Message to include into the error. + Message string `json:"message"` +} + +// ActivityParams maps activity command to activity params. +func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams { + params := bindings.ExecuteActivityParams{ + ExecuteActivityOptions: cmd.Options, + ActivityType: bindings.ActivityType{Name: cmd.Name}, + Input: payloads, + } + + if params.TaskQueueName == "" { + params.TaskQueueName = env.WorkflowInfo().TaskQueueName + } + + return params +} + +// WorkflowParams maps workflow command to workflow params. +func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams { + params := bindings.ExecuteWorkflowParams{ + WorkflowOptions: cmd.Options, + WorkflowType: &bindings.WorkflowType{Name: cmd.Name}, + Input: payloads, + } + + if params.TaskQueueName == "" { + params.TaskQueueName = env.WorkflowInfo().TaskQueueName + } + + return params +} + +// ToDuration converts timer command to time.Duration. +func (cmd NewTimer) ToDuration() time.Duration { + return time.Millisecond * time.Duration(cmd.Milliseconds) +} + +// returns command name (only for the commands sent to the worker) +func commandName(cmd interface{}) (string, error) { + const op = errors.Op("command_name") + switch cmd.(type) { + case GetWorkerInfo, *GetWorkerInfo: + return getWorkerInfoCommand, nil + case StartWorkflow, *StartWorkflow: + return startWorkflowCommand, nil + case InvokeSignal, *InvokeSignal: + return invokeSignalCommand, nil + case InvokeQuery, *InvokeQuery: + return invokeQueryCommand, nil + case DestroyWorkflow, *DestroyWorkflow: + return destroyWorkflowCommand, nil + case CancelWorkflow, *CancelWorkflow: + return cancelWorkflowCommand, nil + case GetStackTrace, *GetStackTrace: + return getStackTraceCommand, nil + case InvokeActivity, *InvokeActivity: + return invokeActivityCommand, nil + case ExecuteActivity, *ExecuteActivity: + return executeActivityCommand, nil + case ExecuteChildWorkflow, *ExecuteChildWorkflow: + return executeChildWorkflowCommand, nil + case GetChildWorkflowExecution, *GetChildWorkflowExecution: + return getChildWorkflowExecutionCommand, nil + case NewTimer, *NewTimer: + return newTimerCommand, nil + case GetVersion, *GetVersion: + return getVersionCommand, nil + case SideEffect, *SideEffect: + return sideEffectCommand, nil + case CompleteWorkflow, *CompleteWorkflow: + return completeWorkflowCommand, nil + case ContinueAsNew, *ContinueAsNew: + return continueAsNewCommand, nil + case SignalExternalWorkflow, *SignalExternalWorkflow: + return signalExternalWorkflowCommand, nil + case CancelExternalWorkflow, *CancelExternalWorkflow: + return cancelExternalWorkflowCommand, nil + case Cancel, *Cancel: + return cancelCommand, nil + case Panic, *Panic: + return panicCommand, nil + default: + return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd)) + } +} + +// reads command from binary payload +func initCommand(name string) (interface{}, error) { + const op = errors.Op("init_command") + switch name { + case getWorkerInfoCommand: + return &GetWorkerInfo{}, nil + + case startWorkflowCommand: + return &StartWorkflow{}, nil + + case invokeSignalCommand: + return &InvokeSignal{}, nil + + case invokeQueryCommand: + return &InvokeQuery{}, nil + + case destroyWorkflowCommand: + return &DestroyWorkflow{}, nil + + case cancelWorkflowCommand: + return &CancelWorkflow{}, nil + + case getStackTraceCommand: + return &GetStackTrace{}, nil + + case invokeActivityCommand: + return &InvokeActivity{}, nil + + case executeActivityCommand: + return &ExecuteActivity{}, nil + + case executeChildWorkflowCommand: + return &ExecuteChildWorkflow{}, nil + + case getChildWorkflowExecutionCommand: + return &GetChildWorkflowExecution{}, nil + + case newTimerCommand: + return &NewTimer{}, nil + + case getVersionCommand: + return &GetVersion{}, nil + + case sideEffectCommand: + return &SideEffect{}, nil + + case completeWorkflowCommand: + return &CompleteWorkflow{}, nil + + case continueAsNewCommand: + return &ContinueAsNew{}, nil + + case signalExternalWorkflowCommand: + return &SignalExternalWorkflow{}, nil + + case cancelExternalWorkflowCommand: + return &CancelExternalWorkflow{}, nil + + case cancelCommand: + return &Cancel{}, nil + + case panicCommand: + return &Panic{}, nil + + default: + return nil, errors.E(op, errors.Errorf("undefined command name: %s", name)) + } +} diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go new file mode 100644 index 00000000..607fe0fe --- /dev/null +++ b/plugins/temporal/protocol/proto_codec.go @@ -0,0 +1,145 @@ +package protocol + +import ( + v1 "github.com/golang/protobuf/proto" //nolint:staticcheck + jsoniter "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal" + "google.golang.org/protobuf/proto" +) + +type ( + // ProtoCodec uses protobuf to exchange messages with underlying workers. + ProtoCodec struct { + } +) + +// NewProtoCodec creates new Proto communication codec. +func NewProtoCodec() Codec { + return &ProtoCodec{} +} + +// WithLogger creates new codes instance with attached logger. +func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec { + return &ProtoCodec{} +} + +// GetName returns codec name. +func (c *ProtoCodec) GetName() string { + return "protobuf" +} + +// Execute exchanges commands with worker. +func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { + if len(msg) == 0 { + return nil, nil + } + + var request = &internal.Frame{} + var response = &internal.Frame{} + var result = make([]Message, 0, 5) + var err error + + for _, m := range msg { + frame, err := c.packMessage(m) + if err != nil { + return nil, err + } + + request.Messages = append(request.Messages, frame) + } + + p := payload.Payload{} + + // context is always in json format + if ctx.IsEmpty() { + p.Context = []byte("null") + } + + p.Context, err = jsoniter.Marshal(ctx) + if err != nil { + return nil, errors.E(errors.Op("encodeContext"), err) + } + + p.Body, err = proto.Marshal(v1.MessageV2(request)) + if err != nil { + return nil, errors.E(errors.Op("encodePayload"), err) + } + + out, err := e.Exec(p) + if err != nil { + return nil, errors.E(errors.Op("execute"), err) + } + + if len(out.Body) == 0 { + // worker inactive or closed + return nil, nil + } + + err = proto.Unmarshal(out.Body, v1.MessageV2(response)) + if err != nil { + return nil, errors.E(errors.Op("parseResponse"), err) + } + + for _, f := range response.Messages { + msg, err := c.parseMessage(f) + if err != nil { + return nil, err + } + + result = append(result, msg) + } + + return result, nil +} + +func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) { + var err error + + frame := &internal.Message{ + Id: msg.ID, + Payloads: msg.Payloads, + Failure: msg.Failure, + } + + if msg.Command != nil { + frame.Command, err = commandName(msg.Command) + if err != nil { + return nil, err + } + + frame.Options, err = jsoniter.Marshal(msg.Command) + if err != nil { + return nil, err + } + } + + return frame, nil +} + +func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) { + const op = errors.Op("proto_codec_parse_message") + var err error + + msg := Message{ + ID: frame.Id, + Payloads: frame.Payloads, + Failure: frame.Failure, + } + + if frame.Command != "" { + msg.Command, err = initCommand(frame.Command) + if err != nil { + return Message{}, errors.E(op, err) + } + + err = jsoniter.Unmarshal(frame.Options, &msg.Command) + if err != nil { + return Message{}, errors.E(op, err) + } + } + + return msg, nil +} diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go new file mode 100644 index 00000000..53076fdf --- /dev/null +++ b/plugins/temporal/protocol/protocol.go @@ -0,0 +1,77 @@ +package protocol + +import ( + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +const ( + // DebugNone disables all debug messages. + DebugNone = iota + + // DebugNormal renders all messages into console. + DebugNormal + + // DebugHumanized enables color highlights for messages. + DebugHumanized +) + +// Context provides worker information about currently. Context can be empty for server level commands. +type Context struct { + // TaskQueue associates message batch with the specific task queue in underlying worker. + TaskQueue string `json:"taskQueue,omitempty"` + + // TickTime associated current or historical time with message batch. + TickTime string `json:"tickTime,omitempty"` + + // Replay indicates that current message batch is historical. + Replay bool `json:"replay,omitempty"` +} + +// Message used to exchange the send commands and receive responses from underlying workers. +type Message struct { + // ID contains ID of the command, response or error. + ID uint64 `json:"id"` + + // Command of the message in unmarshalled form. Pointer. + Command interface{} `json:"command,omitempty"` + + // Failure associated with command id. + Failure *failure.Failure `json:"failure,omitempty"` + + // Payloads contains message specific payloads in binary format. + Payloads *commonpb.Payloads `json:"payloads,omitempty"` +} + +// Codec manages payload encoding and decoding while communication with underlying worker. +type Codec interface { + // WithLogger creates new codes instance with attached logger. + WithLogger(logger.Logger) Codec + + // GetName returns codec name. + GetName() string + + // Execute sends message to worker and waits for the response. + Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) +} + +// Endpoint provides the ability to send and receive messages. +type Endpoint interface { + // ExecWithContext allow to set ExecTTL + Exec(p payload.Payload) (payload.Payload, error) +} + +// DebugLevel configures debug level. +type DebugLevel int + +// IsEmpty only check if task queue set. +func (ctx Context) IsEmpty() bool { + return ctx.TaskQueue == "" +} + +// IsCommand returns true if message carries request. +func (msg Message) IsCommand() bool { + return msg.Command != nil +} diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go new file mode 100644 index 00000000..58a0ae66 --- /dev/null +++ b/plugins/temporal/protocol/worker_info.go @@ -0,0 +1,72 @@ +package protocol + +import ( + "github.com/spiral/errors" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/worker" +) + +// WorkerInfo outlines information about every available worker and it's TaskQueues. + +// WorkerInfo lists available task queues, workflows and activities. +type WorkerInfo struct { + // TaskQueue assigned to the worker. + TaskQueue string `json:"taskQueue"` + + // Options describe worker options. + Options worker.Options `json:"options,omitempty"` + + // Workflows provided by the worker. + Workflows []WorkflowInfo + + // Activities provided by the worker. + Activities []ActivityInfo +} + +// WorkflowInfo describes single worker workflow. +type WorkflowInfo struct { + // Name of the workflow. + Name string `json:"name"` + + // Queries pre-defined for the workflow type. + Queries []string `json:"queries"` + + // Signals pre-defined for the workflow type. + Signals []string `json:"signals"` +} + +// ActivityInfo describes single worker activity. +type ActivityInfo struct { + // Name describes public activity name. + Name string `json:"name"` +} + +// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process). +func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) { + const op = errors.Op("fetch_worker_info") + + result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}}) + if err != nil { + return nil, errors.E(op, err) + } + + if len(result) != 1 { + return nil, errors.E(op, errors.Str("unable to read worker info")) + } + + if result[0].ID != 0 { + return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing")) + } + + var info []WorkerInfo + for i := range result[0].Payloads.Payloads { + wi := WorkerInfo{} + if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil { + return nil, errors.E(op, err) + } + + info = append(info, wi) + } + + return info, nil +} |