summaryrefslogtreecommitdiff
path: root/plugins/temporal/protocol
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-25 22:47:02 +0300
committerValery Piashchynski <[email protected]>2021-01-25 22:47:02 +0300
commit43071e43a0743ff8c7913bba7819952962124355 (patch)
treee3b61113d3c0d28f972c71592af8b2f708994167 /plugins/temporal/protocol
parent5fd1168c687040ca7d72f4727ee1aec753d3f258 (diff)
Initial commit of the Temporal plugins set
Diffstat (limited to 'plugins/temporal/protocol')
-rw-r--r--plugins/temporal/protocol/converter.go76
-rw-r--r--plugins/temporal/protocol/converter_test.go23
-rw-r--r--plugins/temporal/protocol/internal/protocol.pb.go167
-rw-r--r--plugins/temporal/protocol/json_codec.go226
-rw-r--r--plugins/temporal/protocol/message.go333
-rw-r--r--plugins/temporal/protocol/proto_codec.go144
-rw-r--r--plugins/temporal/protocol/protocol.go77
-rw-r--r--plugins/temporal/protocol/worker_info.go72
8 files changed, 1118 insertions, 0 deletions
diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go
new file mode 100644
index 00000000..406e70f4
--- /dev/null
+++ b/plugins/temporal/protocol/converter.go
@@ -0,0 +1,76 @@
+package protocol
+
+import (
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/converter"
+)
+
+type (
+ // DataConverter wraps Temporal data converter to enable direct access to the payloads.
+ DataConverter struct {
+ fallback converter.DataConverter
+ }
+)
+
+// NewDataConverter creates new data converter.
+func NewDataConverter(fallback converter.DataConverter) converter.DataConverter {
+ return &DataConverter{fallback: fallback}
+}
+
+// ToPayloads converts a list of values.
+func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
+ for _, v := range values {
+ if aggregated, ok := v.(*commonpb.Payloads); ok {
+ // bypassing
+ return aggregated, nil
+ }
+ }
+
+ return r.fallback.ToPayloads(values...)
+}
+
+// ToPayload converts single value to payload.
+func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
+ return r.fallback.ToPayload(value)
+}
+
+// FromPayloads converts to a list of values of different types.
+// Useful for deserializing arguments of function invocations.
+func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
+ if payloads == nil {
+ return nil
+ }
+
+ if len(valuePtrs) == 1 {
+ // input proxying
+ if input, ok := valuePtrs[0].(**commonpb.Payloads); ok {
+ *input = &commonpb.Payloads{}
+ (*input).Payloads = payloads.Payloads
+ return nil
+ }
+ }
+
+ for i := 0; i < len(payloads.Payloads); i++ {
+ err := r.FromPayload(payloads.Payloads[i], valuePtrs[i])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// FromPayload converts single value from payload.
+func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
+ return r.fallback.FromPayload(payload, valuePtr)
+}
+
+// ToString converts payload object into human readable string.
+func (r *DataConverter) ToString(input *commonpb.Payload) string {
+ return r.fallback.ToString(input)
+}
+
+// ToStrings converts payloads object into human readable strings.
+func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string {
+ return r.fallback.ToStrings(input)
+}
diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go
new file mode 100644
index 00000000..6ce9fa0f
--- /dev/null
+++ b/plugins/temporal/protocol/converter_test.go
@@ -0,0 +1,23 @@
+package protocol
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/converter"
+)
+
+func Test_Passthough(t *testing.T) {
+ codec := NewDataConverter(converter.GetDefaultDataConverter())
+
+ value, err := codec.ToPayloads("test")
+ assert.NoError(t, err)
+
+ out := &common.Payloads{}
+
+ assert.Len(t, out.Payloads, 0)
+ assert.NoError(t, codec.FromPayloads(value, &out))
+
+ assert.Len(t, out.Payloads, 1)
+}
diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go
new file mode 100644
index 00000000..c554e28f
--- /dev/null
+++ b/plugins/temporal/protocol/internal/protocol.pb.go
@@ -0,0 +1,167 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: protocol.proto
+
+package internal
+
+import (
+ fmt "fmt"
+ math "math"
+
+ proto "github.com/golang/protobuf/proto"
+ v11 "go.temporal.io/api/common/v1"
+ v1 "go.temporal.io/api/failure/v1"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type Frame struct {
+ Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Frame) Reset() { *m = Frame{} }
+func (m *Frame) String() string { return proto.CompactTextString(m) }
+func (*Frame) ProtoMessage() {}
+func (*Frame) Descriptor() ([]byte, []int) {
+ return fileDescriptor_2bc2336598a3f7e0, []int{0}
+}
+
+func (m *Frame) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Frame.Unmarshal(m, b)
+}
+func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Frame.Marshal(b, m, deterministic)
+}
+func (m *Frame) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Frame.Merge(m, src)
+}
+func (m *Frame) XXX_Size() int {
+ return xxx_messageInfo_Frame.Size(m)
+}
+func (m *Frame) XXX_DiscardUnknown() {
+ xxx_messageInfo_Frame.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Frame proto.InternalMessageInfo
+
+func (m *Frame) GetMessages() []*Message {
+ if m != nil {
+ return m.Messages
+ }
+ return nil
+}
+
+// Single communication message.
+type Message struct {
+ Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
+ // command name (if any)
+ Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
+ // command options in json format.
+ Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"`
+ // error response.
+ Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"`
+ // invocation or result payloads.
+ Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Message) Reset() { *m = Message{} }
+func (m *Message) String() string { return proto.CompactTextString(m) }
+func (*Message) ProtoMessage() {}
+func (*Message) Descriptor() ([]byte, []int) {
+ return fileDescriptor_2bc2336598a3f7e0, []int{1}
+}
+
+func (m *Message) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Message.Unmarshal(m, b)
+}
+func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Message.Marshal(b, m, deterministic)
+}
+func (m *Message) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Message.Merge(m, src)
+}
+func (m *Message) XXX_Size() int {
+ return xxx_messageInfo_Message.Size(m)
+}
+func (m *Message) XXX_DiscardUnknown() {
+ xxx_messageInfo_Message.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Message proto.InternalMessageInfo
+
+func (m *Message) GetId() uint64 {
+ if m != nil {
+ return m.Id
+ }
+ return 0
+}
+
+func (m *Message) GetCommand() string {
+ if m != nil {
+ return m.Command
+ }
+ return ""
+}
+
+func (m *Message) GetOptions() []byte {
+ if m != nil {
+ return m.Options
+ }
+ return nil
+}
+
+func (m *Message) GetFailure() *v1.Failure {
+ if m != nil {
+ return m.Failure
+ }
+ return nil
+}
+
+func (m *Message) GetPayloads() *v11.Payloads {
+ if m != nil {
+ return m.Payloads
+ }
+ return nil
+}
+
+func init() {
+ proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame")
+ proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message")
+}
+
+func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) }
+
+var fileDescriptor_2bc2336598a3f7e0 = []byte{
+ // 257 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31,
+ 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec,
+ 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92,
+ 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef,
+ 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c,
+ 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56,
+ 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3,
+ 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47,
+ 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b,
+ 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2,
+ 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf,
+ 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a,
+ 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4,
+ 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43,
+ 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef,
+ 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00,
+ 0x00,
+}
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
new file mode 100644
index 00000000..dae3a7d0
--- /dev/null
+++ b/plugins/temporal/protocol/json_codec.go
@@ -0,0 +1,226 @@
+package protocol
+
+import (
+ "github.com/fatih/color"
+ jsoniter "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+type (
+ // JSONCodec can be used for debugging and log capturing reasons.
+ JSONCodec struct {
+ // level enables verbose logging or all incoming and outcoming messages.
+ level DebugLevel
+
+ // logger renders messages when debug enabled.
+ logger logger.Logger
+ }
+
+ // jsonFrame contains message command in binary form.
+ jsonFrame struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
+
+ // Command name. Optional.
+ Command string `json:"command,omitempty"`
+
+ // Options to be unmarshalled to body (raw payload).
+ Options jsoniter.RawMessage `json:"options,omitempty"`
+
+ // Failure associated with command id.
+ Failure []byte `json:"failure,omitempty"`
+
+ // Payloads specific to the command or result.
+ Payloads []byte `json:"payloads,omitempty"`
+ }
+)
+
+// NewJSONCodec creates new Json communication codec.
+func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
+ return &JSONCodec{
+ level: level,
+ logger: logger,
+ }
+}
+
+// WithLogger creates new codes instance with attached logger.
+func (c *JSONCodec) WithLogger(logger logger.Logger) Codec {
+ return &JSONCodec{
+ level: c.level,
+ logger: logger,
+ }
+}
+
+// GetName returns codec name.
+func (c *JSONCodec) GetName() string {
+ return "json"
+}
+
+// Execute exchanges commands with worker.
+func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ if len(msg) == 0 {
+ return nil, nil
+ }
+
+ var (
+ response = make([]jsonFrame, 0, 5)
+ result = make([]Message, 0, 5)
+ err error
+ )
+
+ frames := make([]jsonFrame, 0, len(msg))
+ for _, m := range msg {
+ frame, err := c.packFrame(m)
+ if err != nil {
+ return nil, err
+ }
+
+ frames = append(frames, frame)
+ }
+
+ p := payload.Payload{}
+
+ if ctx.IsEmpty() {
+ p.Context = []byte("null")
+ }
+
+ p.Context, err = jsoniter.Marshal(ctx)
+ if err != nil {
+ return nil, errors.E(errors.Op("encodeContext"), err)
+ }
+
+ p.Body, err = jsoniter.Marshal(frames)
+ if err != nil {
+ return nil, errors.E(errors.Op("encodePayload"), err)
+ }
+
+ if c.level >= DebugNormal {
+ logMessage := string(p.Body) + " " + string(p.Context)
+ if c.level >= DebugHumanized {
+ logMessage = color.GreenString(logMessage)
+ }
+
+ c.logger.Debug(logMessage)
+ }
+
+ out, err := e.Exec(p)
+ if err != nil {
+ return nil, errors.E(errors.Op("execute"), err)
+ }
+
+ if len(out.Body) == 0 {
+ // worker inactive or closed
+ return nil, nil
+ }
+
+ if c.level >= DebugNormal {
+ logMessage := string(out.Body)
+ if c.level >= DebugHumanized {
+ logMessage = color.HiYellowString(logMessage)
+ }
+
+ c.logger.Debug(logMessage, "receive", true)
+ }
+
+ err = jsoniter.Unmarshal(out.Body, &response)
+ if err != nil {
+ return nil, errors.E(errors.Op("parseResponse"), err)
+ }
+
+ for _, f := range response {
+ msg, err := c.parseFrame(f)
+ if err != nil {
+ return nil, err
+ }
+
+ result = append(result, msg)
+ }
+
+ return result, nil
+}
+
+func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
+ var (
+ err error
+ frame jsonFrame
+ )
+
+ frame.ID = msg.ID
+
+ if msg.Payloads != nil {
+ frame.Payloads, err = msg.Payloads.Marshal()
+ if err != nil {
+ return jsonFrame{}, err
+ }
+ }
+
+ if msg.Failure != nil {
+ frame.Failure, err = msg.Failure.Marshal()
+ if err != nil {
+ return jsonFrame{}, err
+ }
+ }
+
+ if msg.Command == nil {
+ return frame, nil
+ }
+
+ frame.Command, err = commandName(msg.Command)
+ if err != nil {
+ return jsonFrame{}, err
+ }
+
+ frame.Options, err = jsoniter.Marshal(msg.Command)
+ if err != nil {
+ return jsonFrame{}, err
+ }
+
+ return frame, nil
+}
+
+func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
+ var (
+ err error
+ msg Message
+ )
+
+ msg.ID = frame.ID
+
+ if frame.Payloads != nil {
+ msg.Payloads = &common.Payloads{}
+
+ err = msg.Payloads.Unmarshal(frame.Payloads)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ if frame.Failure != nil {
+ msg.Failure = &failure.Failure{}
+
+ err = msg.Failure.Unmarshal(frame.Failure)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ if frame.Command != "" {
+ cmd, err := initCommand(frame.Command)
+ if err != nil {
+ return Message{}, err
+ }
+
+ err = jsoniter.Unmarshal(frame.Options, &cmd)
+ if err != nil {
+ return Message{}, err
+ }
+
+ msg.Command = cmd
+ }
+
+ return msg, nil
+}
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
new file mode 100644
index 00000000..4568fd1d
--- /dev/null
+++ b/plugins/temporal/protocol/message.go
@@ -0,0 +1,333 @@
+package protocol
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/sdk/activity"
+ bindings "go.temporal.io/sdk/internalbindings"
+ "go.temporal.io/sdk/workflow"
+)
+
+const (
+ getWorkerInfoCommand = "GetWorkerInfo"
+
+ invokeActivityCommand = "InvokeActivity"
+ startWorkflowCommand = "StartWorkflow"
+ invokeSignalCommand = "InvokeSignal"
+ invokeQueryCommand = "InvokeQuery"
+ destroyWorkflowCommand = "DestroyWorkflow"
+ cancelWorkflowCommand = "CancelWorkflow"
+ getStackTraceCommand = "StackTrace"
+
+ executeActivityCommand = "ExecuteActivity"
+ executeChildWorkflowCommand = "ExecuteChildWorkflow"
+ getChildWorkflowExecutionCommand = "GetChildWorkflowExecution"
+
+ newTimerCommand = "NewTimer"
+ sideEffectCommand = "SideEffect"
+ getVersionCommand = "GetVersion"
+ completeWorkflowCommand = "CompleteWorkflow"
+ continueAsNewCommand = "ContinueAsNew"
+
+ signalExternalWorkflowCommand = "SignalExternalWorkflow"
+ cancelExternalWorkflowCommand = "CancelExternalWorkflow"
+
+ cancelCommand = "Cancel"
+ panicCommand = "Panic"
+)
+
+// GetWorkerInfo reads worker information.
+type GetWorkerInfo struct {
+}
+
+// InvokeActivity invokes activity.
+type InvokeActivity struct {
+ // Name defines activity name.
+ Name string `json:"name"`
+
+ // Info contains execution context.
+ Info activity.Info `json:"info"`
+
+ // HeartbeatDetails indicates that the payload also contains last heartbeat details.
+ HeartbeatDetails int `json:"heartbeatDetails,omitempty"`
+}
+
+// StartWorkflow sends worker command to start workflow.
+type StartWorkflow struct {
+ // Info to define workflow context.
+ Info *workflow.Info `json:"info"`
+
+ // LastCompletion contains offset of last completion results.
+ LastCompletion int `json:"lastCompletion,omitempty"`
+}
+
+// InvokeSignal invokes signal with a set of arguments.
+type InvokeSignal struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+
+ // Name of the signal.
+ Name string `json:"name"`
+}
+
+// InvokeQuery invokes query with a set of arguments.
+type InvokeQuery struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+ // Name of the query.
+ Name string `json:"name"`
+}
+
+// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal).
+type CancelWorkflow struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// DestroyWorkflow asks worker to offload workflow from memory.
+type DestroyWorkflow struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// GetStackTrace asks worker to offload workflow from memory.
+type GetStackTrace struct {
+ // RunID workflow run id.
+ RunID string `json:"runId"`
+}
+
+// ExecuteActivity command by workflow worker.
+type ExecuteActivity struct {
+ // Name defines activity name.
+ Name string `json:"name"`
+ // Options to run activity.
+ Options bindings.ExecuteActivityOptions `json:"options,omitempty"`
+}
+
+// ExecuteChildWorkflow executes child workflow.
+type ExecuteChildWorkflow struct {
+ // Name defines workflow name.
+ Name string `json:"name"`
+ // Options to run activity.
+ Options bindings.WorkflowOptions `json:"options,omitempty"`
+}
+
+// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow.
+type GetChildWorkflowExecution struct {
+ // ID of child workflow command.
+ ID uint64 `json:"id"`
+}
+
+// NewTimer starts new timer.
+type NewTimer struct {
+ // Milliseconds defines timer duration.
+ Milliseconds int `json:"ms"`
+}
+
+// SideEffect to be recorded into the history.
+type SideEffect struct{}
+
+// GetVersion requests version marker.
+type GetVersion struct {
+ ChangeID string `json:"changeID"`
+ MinSupported int `json:"minSupported"`
+ MaxSupported int `json:"maxSupported"`
+}
+
+// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload.
+type CompleteWorkflow struct{}
+
+// ContinueAsNew restarts workflow with new running instance.
+type ContinueAsNew struct {
+ // Result defines workflow execution result.
+ Name string `json:"name"`
+
+ // Options for continued as new workflow.
+ Options struct {
+ TaskQueueName string
+ WorkflowExecutionTimeout time.Duration
+ WorkflowRunTimeout time.Duration
+ WorkflowTaskTimeout time.Duration
+ } `json:"options"`
+}
+
+// SignalExternalWorkflow sends signal to external workflow.
+type SignalExternalWorkflow struct {
+ Namespace string `json:"namespace"`
+ WorkflowID string `json:"workflowID"`
+ RunID string `json:"runID"`
+ Signal string `json:"signal"`
+ ChildWorkflowOnly bool `json:"childWorkflowOnly"`
+}
+
+// CancelExternalWorkflow canceller external workflow.
+type CancelExternalWorkflow struct {
+ Namespace string `json:"namespace"`
+ WorkflowID string `json:"workflowID"`
+ RunID string `json:"runID"`
+}
+
+// Cancel one or multiple internal promises (activities, local activities, timers, child workflows).
+type Cancel struct {
+ // CommandIDs to be cancelled.
+ CommandIDs []uint64 `json:"ids"`
+}
+
+// Panic triggers panic in workflow process.
+type Panic struct {
+ // Message to include into the error.
+ Message string `json:"message"`
+}
+
+// ActivityParams maps activity command to activity params.
+func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams {
+ params := bindings.ExecuteActivityParams{
+ ExecuteActivityOptions: cmd.Options,
+ ActivityType: bindings.ActivityType{Name: cmd.Name},
+ Input: payloads,
+ }
+
+ if params.TaskQueueName == "" {
+ params.TaskQueueName = env.WorkflowInfo().TaskQueueName
+ }
+
+ return params
+}
+
+// WorkflowParams maps workflow command to workflow params.
+func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams {
+ params := bindings.ExecuteWorkflowParams{
+ WorkflowOptions: cmd.Options,
+ WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
+ Input: payloads,
+ }
+
+ if params.TaskQueueName == "" {
+ params.TaskQueueName = env.WorkflowInfo().TaskQueueName
+ }
+
+ return params
+}
+
+// ToDuration converts timer command to time.Duration.
+func (cmd NewTimer) ToDuration() time.Duration {
+ return time.Millisecond * time.Duration(cmd.Milliseconds)
+}
+
+// returns command name (only for the commands sent to the worker)
+func commandName(cmd interface{}) (string, error) {
+ switch cmd.(type) {
+ case GetWorkerInfo, *GetWorkerInfo:
+ return getWorkerInfoCommand, nil
+ case StartWorkflow, *StartWorkflow:
+ return startWorkflowCommand, nil
+ case InvokeSignal, *InvokeSignal:
+ return invokeSignalCommand, nil
+ case InvokeQuery, *InvokeQuery:
+ return invokeQueryCommand, nil
+ case DestroyWorkflow, *DestroyWorkflow:
+ return destroyWorkflowCommand, nil
+ case CancelWorkflow, *CancelWorkflow:
+ return cancelWorkflowCommand, nil
+ case GetStackTrace, *GetStackTrace:
+ return getStackTraceCommand, nil
+ case InvokeActivity, *InvokeActivity:
+ return invokeActivityCommand, nil
+ case ExecuteActivity, *ExecuteActivity:
+ return executeActivityCommand, nil
+ case ExecuteChildWorkflow, *ExecuteChildWorkflow:
+ return executeChildWorkflowCommand, nil
+ case GetChildWorkflowExecution, *GetChildWorkflowExecution:
+ return getChildWorkflowExecutionCommand, nil
+ case NewTimer, *NewTimer:
+ return newTimerCommand, nil
+ case GetVersion, *GetVersion:
+ return getVersionCommand, nil
+ case SideEffect, *SideEffect:
+ return sideEffectCommand, nil
+ case CompleteWorkflow, *CompleteWorkflow:
+ return completeWorkflowCommand, nil
+ case ContinueAsNew, *ContinueAsNew:
+ return continueAsNewCommand, nil
+ case SignalExternalWorkflow, *SignalExternalWorkflow:
+ return signalExternalWorkflowCommand, nil
+ case CancelExternalWorkflow, *CancelExternalWorkflow:
+ return cancelExternalWorkflowCommand, nil
+ case Cancel, *Cancel:
+ return cancelCommand, nil
+ case Panic, *Panic:
+ return panicCommand, nil
+ default:
+ return "", errors.E(errors.Op("commandName"), "undefined command type", cmd)
+ }
+}
+
+// reads command from binary payload
+func initCommand(name string) (interface{}, error) {
+ switch name {
+ case getWorkerInfoCommand:
+ return &GetWorkerInfo{}, nil
+
+ case startWorkflowCommand:
+ return &StartWorkflow{}, nil
+
+ case invokeSignalCommand:
+ return &InvokeSignal{}, nil
+
+ case invokeQueryCommand:
+ return &InvokeQuery{}, nil
+
+ case destroyWorkflowCommand:
+ return &DestroyWorkflow{}, nil
+
+ case cancelWorkflowCommand:
+ return &CancelWorkflow{}, nil
+
+ case getStackTraceCommand:
+ return &GetStackTrace{}, nil
+
+ case invokeActivityCommand:
+ return &InvokeActivity{}, nil
+
+ case executeActivityCommand:
+ return &ExecuteActivity{}, nil
+
+ case executeChildWorkflowCommand:
+ return &ExecuteChildWorkflow{}, nil
+
+ case getChildWorkflowExecutionCommand:
+ return &GetChildWorkflowExecution{}, nil
+
+ case newTimerCommand:
+ return &NewTimer{}, nil
+
+ case getVersionCommand:
+ return &GetVersion{}, nil
+
+ case sideEffectCommand:
+ return &SideEffect{}, nil
+
+ case completeWorkflowCommand:
+ return &CompleteWorkflow{}, nil
+
+ case continueAsNewCommand:
+ return &ContinueAsNew{}, nil
+
+ case signalExternalWorkflowCommand:
+ return &SignalExternalWorkflow{}, nil
+
+ case cancelExternalWorkflowCommand:
+ return &CancelExternalWorkflow{}, nil
+
+ case cancelCommand:
+ return &Cancel{}, nil
+
+ case panicCommand:
+ return &Panic{}, nil
+
+ default:
+ return nil, errors.E(errors.Op("initCommand"), "undefined command type", name)
+ }
+}
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
new file mode 100644
index 00000000..b41f02b6
--- /dev/null
+++ b/plugins/temporal/protocol/proto_codec.go
@@ -0,0 +1,144 @@
+package protocol
+
+import (
+ v1 "github.com/golang/protobuf/proto" //nolint:staticcheck
+ jsoniter "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal"
+ "google.golang.org/protobuf/proto"
+)
+
+type (
+ // ProtoCodec uses protobuf to exchange messages with underlying workers.
+ ProtoCodec struct {
+ }
+)
+
+// NewProtoCodec creates new Proto communication codec.
+func NewProtoCodec() Codec {
+ return &ProtoCodec{}
+}
+
+// WithLogger creates new codes instance with attached logger.
+func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec {
+ return &ProtoCodec{}
+}
+
+// GetName returns codec name.
+func (c *ProtoCodec) GetName() string {
+ return "protobuf"
+}
+
+// Execute exchanges commands with worker.
+func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ if len(msg) == 0 {
+ return nil, nil
+ }
+
+ var request = &internal.Frame{}
+ var response = &internal.Frame{}
+ var result = make([]Message, 0, 5)
+ var err error
+
+ for _, m := range msg {
+ frame, err := c.packMessage(m)
+ if err != nil {
+ return nil, err
+ }
+
+ request.Messages = append(request.Messages, frame)
+ }
+
+ p := payload.Payload{}
+
+ // context is always in json format
+ if ctx.IsEmpty() {
+ p.Context = []byte("null")
+ }
+
+ p.Context, err = jsoniter.Marshal(ctx)
+ if err != nil {
+ return nil, errors.E(errors.Op("encodeContext"), err)
+ }
+
+ p.Body, err = proto.Marshal(v1.MessageV2(request))
+ if err != nil {
+ return nil, errors.E(errors.Op("encodePayload"), err)
+ }
+
+ out, err := e.Exec(p)
+ if err != nil {
+ return nil, errors.E(errors.Op("execute"), err)
+ }
+
+ if len(out.Body) == 0 {
+ // worker inactive or closed
+ return nil, nil
+ }
+
+ err = proto.Unmarshal(out.Body, v1.MessageV2(response))
+ if err != nil {
+ return nil, errors.E(errors.Op("parseResponse"), err)
+ }
+
+ for _, f := range response.Messages {
+ msg, err := c.parseMessage(f)
+ if err != nil {
+ return nil, err
+ }
+
+ result = append(result, msg)
+ }
+
+ return result, nil
+}
+
+func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
+ var err error
+
+ frame := &internal.Message{
+ Id: msg.ID,
+ Payloads: msg.Payloads,
+ Failure: msg.Failure,
+ }
+
+ if msg.Command != nil {
+ frame.Command, err = commandName(msg.Command)
+ if err != nil {
+ return nil, err
+ }
+
+ frame.Options, err = jsoniter.Marshal(msg.Command)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return frame, nil
+}
+
+func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
+ var err error
+
+ msg := Message{
+ ID: frame.Id,
+ Payloads: frame.Payloads,
+ Failure: frame.Failure,
+ }
+
+ if frame.Command != "" {
+ msg.Command, err = initCommand(frame.Command)
+ if err != nil {
+ return Message{}, err
+ }
+
+ err = jsoniter.Unmarshal(frame.Options, &msg.Command)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ return msg, nil
+}
diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go
new file mode 100644
index 00000000..53076fdf
--- /dev/null
+++ b/plugins/temporal/protocol/protocol.go
@@ -0,0 +1,77 @@
+package protocol
+
+import (
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+const (
+ // DebugNone disables all debug messages.
+ DebugNone = iota
+
+ // DebugNormal renders all messages into console.
+ DebugNormal
+
+ // DebugHumanized enables color highlights for messages.
+ DebugHumanized
+)
+
+// Context provides worker information about currently. Context can be empty for server level commands.
+type Context struct {
+ // TaskQueue associates message batch with the specific task queue in underlying worker.
+ TaskQueue string `json:"taskQueue,omitempty"`
+
+ // TickTime associated current or historical time with message batch.
+ TickTime string `json:"tickTime,omitempty"`
+
+ // Replay indicates that current message batch is historical.
+ Replay bool `json:"replay,omitempty"`
+}
+
+// Message used to exchange the send commands and receive responses from underlying workers.
+type Message struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
+
+ // Command of the message in unmarshalled form. Pointer.
+ Command interface{} `json:"command,omitempty"`
+
+ // Failure associated with command id.
+ Failure *failure.Failure `json:"failure,omitempty"`
+
+ // Payloads contains message specific payloads in binary format.
+ Payloads *commonpb.Payloads `json:"payloads,omitempty"`
+}
+
+// Codec manages payload encoding and decoding while communication with underlying worker.
+type Codec interface {
+ // WithLogger creates new codes instance with attached logger.
+ WithLogger(logger.Logger) Codec
+
+ // GetName returns codec name.
+ GetName() string
+
+ // Execute sends message to worker and waits for the response.
+ Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
+}
+
+// Endpoint provides the ability to send and receive messages.
+type Endpoint interface {
+ // ExecWithContext allow to set ExecTTL
+ Exec(p payload.Payload) (payload.Payload, error)
+}
+
+// DebugLevel configures debug level.
+type DebugLevel int
+
+// IsEmpty only check if task queue set.
+func (ctx Context) IsEmpty() bool {
+ return ctx.TaskQueue == ""
+}
+
+// IsCommand returns true if message carries request.
+func (msg Message) IsCommand() bool {
+ return msg.Command != nil
+}
diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go
new file mode 100644
index 00000000..6dfcd81f
--- /dev/null
+++ b/plugins/temporal/protocol/worker_info.go
@@ -0,0 +1,72 @@
+package protocol
+
+import (
+ "github.com/spiral/errors"
+ "go.temporal.io/sdk/converter"
+ "go.temporal.io/sdk/worker"
+)
+
+// WorkerInfo outlines information about every available worker and it's TaskQueues.
+
+// WorkerInfo lists available task queues, workflows and activities.
+type WorkerInfo struct {
+ // TaskQueue assigned to the worker.
+ TaskQueue string `json:"taskQueue"`
+
+ // Options describe worker options.
+ Options worker.Options `json:"options,omitempty"`
+
+ // Workflows provided by the worker.
+ Workflows []WorkflowInfo
+
+ // Activities provided by the worker.
+ Activities []ActivityInfo
+}
+
+// WorkflowInfo describes single worker workflow.
+type WorkflowInfo struct {
+ // Name of the workflow.
+ Name string `json:"name"`
+
+ // Queries pre-defined for the workflow type.
+ Queries []string `json:"queries"`
+
+ // Signals pre-defined for the workflow type.
+ Signals []string `json:"signals"`
+}
+
+// ActivityInfo describes single worker activity.
+type ActivityInfo struct {
+ // Name describes public activity name.
+ Name string `json:"name"`
+}
+
+// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process).
+func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) {
+ const op = errors.Op("fetch_worker_info")
+
+ result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}})
+ if err != nil {
+ return nil, err
+ }
+
+ if len(result) != 1 {
+ return nil, errors.E(op, errors.Str("unable to read worker info"))
+ }
+
+ if result[0].ID != 0 {
+ return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing"))
+ }
+
+ var info []WorkerInfo
+ for i := range result[0].Payloads.Payloads {
+ wi := WorkerInfo{}
+ if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ info = append(info, wi)
+ }
+
+ return info, nil
+}