summaryrefslogtreecommitdiff
path: root/plugins/temporal/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/protocol')
-rw-r--r--plugins/temporal/protocol/converter.go76
-rw-r--r--plugins/temporal/protocol/converter_test.go23
-rw-r--r--plugins/temporal/protocol/internal/protocol.pb.go167
-rw-r--r--plugins/temporal/protocol/json_codec.go225
-rw-r--r--plugins/temporal/protocol/message.go334
-rw-r--r--plugins/temporal/protocol/proto_codec.go145
-rw-r--r--plugins/temporal/protocol/protocol.go77
-rw-r--r--plugins/temporal/protocol/worker_info.go72
8 files changed, 0 insertions, 1119 deletions
diff --git a/plugins/temporal/protocol/converter.go b/plugins/temporal/protocol/converter.go
deleted file mode 100644
index 406e70f4..00000000
--- a/plugins/temporal/protocol/converter.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package protocol
-
-import (
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/converter"
-)
-
-type (
- // DataConverter wraps Temporal data converter to enable direct access to the payloads.
- DataConverter struct {
- fallback converter.DataConverter
- }
-)
-
-// NewDataConverter creates new data converter.
-func NewDataConverter(fallback converter.DataConverter) converter.DataConverter {
- return &DataConverter{fallback: fallback}
-}
-
-// ToPayloads converts a list of values.
-func (r *DataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
- for _, v := range values {
- if aggregated, ok := v.(*commonpb.Payloads); ok {
- // bypassing
- return aggregated, nil
- }
- }
-
- return r.fallback.ToPayloads(values...)
-}
-
-// ToPayload converts single value to payload.
-func (r *DataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
- return r.fallback.ToPayload(value)
-}
-
-// FromPayloads converts to a list of values of different types.
-// Useful for deserializing arguments of function invocations.
-func (r *DataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
- if payloads == nil {
- return nil
- }
-
- if len(valuePtrs) == 1 {
- // input proxying
- if input, ok := valuePtrs[0].(**commonpb.Payloads); ok {
- *input = &commonpb.Payloads{}
- (*input).Payloads = payloads.Payloads
- return nil
- }
- }
-
- for i := 0; i < len(payloads.Payloads); i++ {
- err := r.FromPayload(payloads.Payloads[i], valuePtrs[i])
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// FromPayload converts single value from payload.
-func (r *DataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
- return r.fallback.FromPayload(payload, valuePtr)
-}
-
-// ToString converts payload object into human readable string.
-func (r *DataConverter) ToString(input *commonpb.Payload) string {
- return r.fallback.ToString(input)
-}
-
-// ToStrings converts payloads object into human readable strings.
-func (r *DataConverter) ToStrings(input *commonpb.Payloads) []string {
- return r.fallback.ToStrings(input)
-}
diff --git a/plugins/temporal/protocol/converter_test.go b/plugins/temporal/protocol/converter_test.go
deleted file mode 100644
index 6ce9fa0f..00000000
--- a/plugins/temporal/protocol/converter_test.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package protocol
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/converter"
-)
-
-func Test_Passthough(t *testing.T) {
- codec := NewDataConverter(converter.GetDefaultDataConverter())
-
- value, err := codec.ToPayloads("test")
- assert.NoError(t, err)
-
- out := &common.Payloads{}
-
- assert.Len(t, out.Payloads, 0)
- assert.NoError(t, codec.FromPayloads(value, &out))
-
- assert.Len(t, out.Payloads, 1)
-}
diff --git a/plugins/temporal/protocol/internal/protocol.pb.go b/plugins/temporal/protocol/internal/protocol.pb.go
deleted file mode 100644
index c554e28f..00000000
--- a/plugins/temporal/protocol/internal/protocol.pb.go
+++ /dev/null
@@ -1,167 +0,0 @@
-// Code generated by protoc-gen-go. DO NOT EDIT.
-// source: protocol.proto
-
-package internal
-
-import (
- fmt "fmt"
- math "math"
-
- proto "github.com/golang/protobuf/proto"
- v11 "go.temporal.io/api/common/v1"
- v1 "go.temporal.io/api/failure/v1"
-)
-
-// Reference imports to suppress errors if they are not otherwise used.
-var _ = proto.Marshal
-var _ = fmt.Errorf
-var _ = math.Inf
-
-// This is a compile-time assertion to ensure that this generated file
-// is compatible with the proto package it is being compiled against.
-// A compilation error at this line likely means your copy of the
-// proto package needs to be updated.
-const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
-
-type Frame struct {
- Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
-}
-
-func (m *Frame) Reset() { *m = Frame{} }
-func (m *Frame) String() string { return proto.CompactTextString(m) }
-func (*Frame) ProtoMessage() {}
-func (*Frame) Descriptor() ([]byte, []int) {
- return fileDescriptor_2bc2336598a3f7e0, []int{0}
-}
-
-func (m *Frame) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_Frame.Unmarshal(m, b)
-}
-func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_Frame.Marshal(b, m, deterministic)
-}
-func (m *Frame) XXX_Merge(src proto.Message) {
- xxx_messageInfo_Frame.Merge(m, src)
-}
-func (m *Frame) XXX_Size() int {
- return xxx_messageInfo_Frame.Size(m)
-}
-func (m *Frame) XXX_DiscardUnknown() {
- xxx_messageInfo_Frame.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Frame proto.InternalMessageInfo
-
-func (m *Frame) GetMessages() []*Message {
- if m != nil {
- return m.Messages
- }
- return nil
-}
-
-// Single communication message.
-type Message struct {
- Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
- // command name (if any)
- Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
- // command options in json format.
- Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"`
- // error response.
- Failure *v1.Failure `protobuf:"bytes,4,opt,name=failure,proto3" json:"failure,omitempty"`
- // invocation or result payloads.
- Payloads *v11.Payloads `protobuf:"bytes,5,opt,name=payloads,proto3" json:"payloads,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_unrecognized []byte `json:"-"`
- XXX_sizecache int32 `json:"-"`
-}
-
-func (m *Message) Reset() { *m = Message{} }
-func (m *Message) String() string { return proto.CompactTextString(m) }
-func (*Message) ProtoMessage() {}
-func (*Message) Descriptor() ([]byte, []int) {
- return fileDescriptor_2bc2336598a3f7e0, []int{1}
-}
-
-func (m *Message) XXX_Unmarshal(b []byte) error {
- return xxx_messageInfo_Message.Unmarshal(m, b)
-}
-func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
- return xxx_messageInfo_Message.Marshal(b, m, deterministic)
-}
-func (m *Message) XXX_Merge(src proto.Message) {
- xxx_messageInfo_Message.Merge(m, src)
-}
-func (m *Message) XXX_Size() int {
- return xxx_messageInfo_Message.Size(m)
-}
-func (m *Message) XXX_DiscardUnknown() {
- xxx_messageInfo_Message.DiscardUnknown(m)
-}
-
-var xxx_messageInfo_Message proto.InternalMessageInfo
-
-func (m *Message) GetId() uint64 {
- if m != nil {
- return m.Id
- }
- return 0
-}
-
-func (m *Message) GetCommand() string {
- if m != nil {
- return m.Command
- }
- return ""
-}
-
-func (m *Message) GetOptions() []byte {
- if m != nil {
- return m.Options
- }
- return nil
-}
-
-func (m *Message) GetFailure() *v1.Failure {
- if m != nil {
- return m.Failure
- }
- return nil
-}
-
-func (m *Message) GetPayloads() *v11.Payloads {
- if m != nil {
- return m.Payloads
- }
- return nil
-}
-
-func init() {
- proto.RegisterType((*Frame)(nil), "temporal.roadrunner.internal.Frame")
- proto.RegisterType((*Message)(nil), "temporal.roadrunner.internal.Message")
-}
-
-func init() { proto.RegisterFile("protocol.proto", fileDescriptor_2bc2336598a3f7e0) }
-
-var fileDescriptor_2bc2336598a3f7e0 = []byte{
- // 257 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x90, 0x41, 0x4b, 0x33, 0x31,
- 0x10, 0x86, 0xc9, 0xb6, 0xfd, 0xb6, 0x5f, 0x2a, 0x3d, 0xe4, 0x14, 0x8a, 0x87, 0x50, 0x2c, 0xec,
- 0x29, 0x4b, 0xeb, 0x4d, 0xbc, 0x78, 0xe9, 0x41, 0x10, 0x24, 0xff, 0x60, 0x6c, 0xa2, 0x04, 0x92,
- 0x4c, 0x48, 0xb6, 0x05, 0x7f, 0xa1, 0x7f, 0x4b, 0xb6, 0x9b, 0xad, 0x88, 0xe2, 0x6d, 0x26, 0xef,
- 0xfb, 0x84, 0x87, 0xa1, 0xcb, 0x98, 0xb0, 0xc3, 0x03, 0x3a, 0x79, 0x1e, 0xd8, 0x75, 0x67, 0x7c,
- 0xc4, 0x04, 0x4e, 0x26, 0x04, 0x9d, 0x8e, 0x21, 0x98, 0x24, 0x6d, 0xe8, 0x4c, 0x0a, 0xe0, 0x56,
- 0x37, 0x63, 0xda, 0x42, 0xb4, 0xed, 0x01, 0xbd, 0xc7, 0xd0, 0x9e, 0xb6, 0xad, 0x37, 0x39, 0xc3,
- 0x9b, 0x19, 0xfe, 0x58, 0x6d, 0xbe, 0xb5, 0x5e, 0xc1, 0xba, 0x63, 0x32, 0x3f, 0x6a, 0xeb, 0x47,
- 0x3a, 0xdb, 0x27, 0xf0, 0x86, 0x3d, 0xd0, 0x79, 0x49, 0x32, 0x27, 0x62, 0xd2, 0x2c, 0x76, 0x1b,
- 0xf9, 0x97, 0x86, 0x7c, 0x1a, 0xda, 0xea, 0x82, 0xad, 0x3f, 0x08, 0xad, 0xcb, 0x2b, 0x5b, 0xd2,
- 0xca, 0x6a, 0x4e, 0x04, 0x69, 0xa6, 0xaa, 0xb2, 0x9a, 0x71, 0x5a, 0xf7, 0xa6, 0x10, 0x34, 0xaf,
- 0x04, 0x69, 0xfe, 0xab, 0x71, 0xed, 0x13, 0x8c, 0x9d, 0xc5, 0x90, 0xf9, 0x44, 0x90, 0xe6, 0x4a,
- 0x8d, 0x2b, 0xbb, 0xa3, 0x75, 0xf1, 0xe6, 0x53, 0x41, 0x9a, 0xc5, 0x4e, 0x7c, 0x19, 0x41, 0xb4,
- 0xb2, 0x84, 0xf2, 0xb4, 0x95, 0xfb, 0x61, 0x54, 0x23, 0xc0, 0xee, 0xe9, 0x3c, 0xc2, 0xbb, 0x43,
- 0xd0, 0x99, 0xcf, 0x7e, 0x83, 0x87, 0xbb, 0xf5, 0xec, 0x73, 0xe9, 0xa9, 0x0b, 0xf1, 0xf2, 0xef,
- 0x7c, 0x9c, 0xdb, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x59, 0xb1, 0x79, 0xd4, 0x99, 0x01, 0x00,
- 0x00,
-}
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
deleted file mode 100644
index e7a77068..00000000
--- a/plugins/temporal/protocol/json_codec.go
+++ /dev/null
@@ -1,225 +0,0 @@
-package protocol
-
-import (
- "github.com/fatih/color"
- j "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-var json = j.ConfigCompatibleWithStandardLibrary
-
-// JSONCodec can be used for debugging and log capturing reasons.
-type JSONCodec struct {
- // level enables verbose logging or all incoming and outcoming messages.
- level DebugLevel
-
- // logger renders messages when debug enabled.
- logger logger.Logger
-}
-
-// jsonFrame contains message command in binary form.
-type jsonFrame struct {
- // ID contains ID of the command, response or error.
- ID uint64 `json:"id"`
-
- // Command name. Optional.
- Command string `json:"command,omitempty"`
-
- // Options to be unmarshalled to body (raw payload).
- Options j.RawMessage `json:"options,omitempty"`
-
- // Failure associated with command id.
- Failure []byte `json:"failure,omitempty"`
-
- // Payloads specific to the command or result.
- Payloads []byte `json:"payloads,omitempty"`
-}
-
-// NewJSONCodec creates new Json communication codec.
-func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
- return &JSONCodec{
- level: level,
- logger: logger,
- }
-}
-
-// WithLogger creates new codes instance with attached logger.
-func (c *JSONCodec) WithLogger(logger logger.Logger) Codec {
- return &JSONCodec{
- level: c.level,
- logger: logger,
- }
-}
-
-// GetName returns codec name.
-func (c *JSONCodec) GetName() string {
- return "json"
-}
-
-// Execute exchanges commands with worker.
-func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
- const op = errors.Op("json_codec_execute")
- if len(msg) == 0 {
- return nil, nil
- }
-
- var response = make([]jsonFrame, 0, 5)
- var result = make([]Message, 0, 5)
- var err error
-
- frames := make([]jsonFrame, 0, len(msg))
- for _, m := range msg {
- frame, err := c.packFrame(m)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- frames = append(frames, frame)
- }
-
- p := payload.Payload{}
-
- if ctx.IsEmpty() {
- p.Context = []byte("null")
- }
-
- p.Context, err = json.Marshal(ctx)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- p.Body, err = json.Marshal(frames)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if c.level >= DebugNormal {
- logMessage := string(p.Body) + " " + string(p.Context)
- if c.level >= DebugHumanized {
- logMessage = color.GreenString(logMessage)
- }
-
- c.logger.Debug(logMessage)
- }
-
- out, err := e.Exec(p)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if len(out.Body) == 0 {
- // worker inactive or closed
- return nil, nil
- }
-
- if c.level >= DebugNormal {
- logMessage := string(out.Body)
- if c.level >= DebugHumanized {
- logMessage = color.HiYellowString(logMessage)
- }
-
- c.logger.Debug(logMessage, "receive", true)
- }
-
- err = json.Unmarshal(out.Body, &response)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- for _, f := range response {
- msg, err := c.parseFrame(f)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- result = append(result, msg)
- }
-
- return result, nil
-}
-
-func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
- var (
- err error
- frame jsonFrame
- )
-
- frame.ID = msg.ID
-
- if msg.Payloads != nil {
- frame.Payloads, err = msg.Payloads.Marshal()
- if err != nil {
- return jsonFrame{}, err
- }
- }
-
- if msg.Failure != nil {
- frame.Failure, err = msg.Failure.Marshal()
- if err != nil {
- return jsonFrame{}, err
- }
- }
-
- if msg.Command == nil {
- return frame, nil
- }
-
- frame.Command, err = commandName(msg.Command)
- if err != nil {
- return jsonFrame{}, err
- }
-
- frame.Options, err = json.Marshal(msg.Command)
- if err != nil {
- return jsonFrame{}, err
- }
-
- return frame, nil
-}
-
-func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
- var (
- err error
- msg Message
- )
-
- msg.ID = frame.ID
-
- if frame.Payloads != nil {
- msg.Payloads = &common.Payloads{}
-
- err = msg.Payloads.Unmarshal(frame.Payloads)
- if err != nil {
- return Message{}, err
- }
- }
-
- if frame.Failure != nil {
- msg.Failure = &failure.Failure{}
-
- err = msg.Failure.Unmarshal(frame.Failure)
- if err != nil {
- return Message{}, err
- }
- }
-
- if frame.Command != "" {
- cmd, err := initCommand(frame.Command)
- if err != nil {
- return Message{}, err
- }
-
- err = json.Unmarshal(frame.Options, &cmd)
- if err != nil {
- return Message{}, err
- }
-
- msg.Command = cmd
- }
-
- return msg, nil
-}
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
deleted file mode 100644
index d5e0f49d..00000000
--- a/plugins/temporal/protocol/message.go
+++ /dev/null
@@ -1,334 +0,0 @@
-package protocol
-
-import (
- "time"
-
- "github.com/spiral/errors"
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/sdk/activity"
- bindings "go.temporal.io/sdk/internalbindings"
- "go.temporal.io/sdk/workflow"
-)
-
-const (
- getWorkerInfoCommand = "GetWorkerInfo"
-
- invokeActivityCommand = "InvokeActivity"
- startWorkflowCommand = "StartWorkflow"
- invokeSignalCommand = "InvokeSignal"
- invokeQueryCommand = "InvokeQuery"
- destroyWorkflowCommand = "DestroyWorkflow"
- cancelWorkflowCommand = "CancelWorkflow"
- getStackTraceCommand = "StackTrace"
-
- executeActivityCommand = "ExecuteActivity"
- executeChildWorkflowCommand = "ExecuteChildWorkflow"
- getChildWorkflowExecutionCommand = "GetChildWorkflowExecution"
-
- newTimerCommand = "NewTimer"
- sideEffectCommand = "SideEffect"
- getVersionCommand = "GetVersion"
- completeWorkflowCommand = "CompleteWorkflow"
- continueAsNewCommand = "ContinueAsNew"
-
- signalExternalWorkflowCommand = "SignalExternalWorkflow"
- cancelExternalWorkflowCommand = "CancelExternalWorkflow"
-
- cancelCommand = "Cancel"
- panicCommand = "Panic"
-)
-
-// GetWorkerInfo reads worker information.
-type GetWorkerInfo struct{}
-
-// InvokeActivity invokes activity.
-type InvokeActivity struct {
- // Name defines activity name.
- Name string `json:"name"`
-
- // Info contains execution context.
- Info activity.Info `json:"info"`
-
- // HeartbeatDetails indicates that the payload also contains last heartbeat details.
- HeartbeatDetails int `json:"heartbeatDetails,omitempty"`
-}
-
-// StartWorkflow sends worker command to start workflow.
-type StartWorkflow struct {
- // Info to define workflow context.
- Info *workflow.Info `json:"info"`
-
- // LastCompletion contains offset of last completion results.
- LastCompletion int `json:"lastCompletion,omitempty"`
-}
-
-// InvokeSignal invokes signal with a set of arguments.
-type InvokeSignal struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-
- // Name of the signal.
- Name string `json:"name"`
-}
-
-// InvokeQuery invokes query with a set of arguments.
-type InvokeQuery struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
- // Name of the query.
- Name string `json:"name"`
-}
-
-// CancelWorkflow asks worker to gracefully stop workflow, if possible (signal).
-type CancelWorkflow struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// DestroyWorkflow asks worker to offload workflow from memory.
-type DestroyWorkflow struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// GetStackTrace asks worker to offload workflow from memory.
-type GetStackTrace struct {
- // RunID workflow run id.
- RunID string `json:"runId"`
-}
-
-// ExecuteActivity command by workflow worker.
-type ExecuteActivity struct {
- // Name defines activity name.
- Name string `json:"name"`
- // Options to run activity.
- Options bindings.ExecuteActivityOptions `json:"options,omitempty"`
-}
-
-// ExecuteChildWorkflow executes child workflow.
-type ExecuteChildWorkflow struct {
- // Name defines workflow name.
- Name string `json:"name"`
- // Options to run activity.
- Options bindings.WorkflowOptions `json:"options,omitempty"`
-}
-
-// GetChildWorkflowExecution returns the WorkflowID and RunId of child workflow.
-type GetChildWorkflowExecution struct {
- // ID of child workflow command.
- ID uint64 `json:"id"`
-}
-
-// NewTimer starts new timer.
-type NewTimer struct {
- // Milliseconds defines timer duration.
- Milliseconds int `json:"ms"`
-}
-
-// SideEffect to be recorded into the history.
-type SideEffect struct{}
-
-// GetVersion requests version marker.
-type GetVersion struct {
- ChangeID string `json:"changeID"`
- MinSupported int `json:"minSupported"`
- MaxSupported int `json:"maxSupported"`
-}
-
-// CompleteWorkflow sent by worker to complete workflow. Might include additional error as part of the payload.
-type CompleteWorkflow struct{}
-
-// ContinueAsNew restarts workflow with new running instance.
-type ContinueAsNew struct {
- // Result defines workflow execution result.
- Name string `json:"name"`
-
- // Options for continued as new workflow.
- Options struct {
- TaskQueueName string
- WorkflowExecutionTimeout time.Duration
- WorkflowRunTimeout time.Duration
- WorkflowTaskTimeout time.Duration
- } `json:"options"`
-}
-
-// SignalExternalWorkflow sends signal to external workflow.
-type SignalExternalWorkflow struct {
- Namespace string `json:"namespace"`
- WorkflowID string `json:"workflowID"`
- RunID string `json:"runID"`
- Signal string `json:"signal"`
- ChildWorkflowOnly bool `json:"childWorkflowOnly"`
-}
-
-// CancelExternalWorkflow canceller external workflow.
-type CancelExternalWorkflow struct {
- Namespace string `json:"namespace"`
- WorkflowID string `json:"workflowID"`
- RunID string `json:"runID"`
-}
-
-// Cancel one or multiple internal promises (activities, local activities, timers, child workflows).
-type Cancel struct {
- // CommandIDs to be cancelled.
- CommandIDs []uint64 `json:"ids"`
-}
-
-// Panic triggers panic in workflow process.
-type Panic struct {
- // Message to include into the error.
- Message string `json:"message"`
-}
-
-// ActivityParams maps activity command to activity params.
-func (cmd ExecuteActivity) ActivityParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteActivityParams {
- params := bindings.ExecuteActivityParams{
- ExecuteActivityOptions: cmd.Options,
- ActivityType: bindings.ActivityType{Name: cmd.Name},
- Input: payloads,
- }
-
- if params.TaskQueueName == "" {
- params.TaskQueueName = env.WorkflowInfo().TaskQueueName
- }
-
- return params
-}
-
-// WorkflowParams maps workflow command to workflow params.
-func (cmd ExecuteChildWorkflow) WorkflowParams(env bindings.WorkflowEnvironment, payloads *commonpb.Payloads) bindings.ExecuteWorkflowParams {
- params := bindings.ExecuteWorkflowParams{
- WorkflowOptions: cmd.Options,
- WorkflowType: &bindings.WorkflowType{Name: cmd.Name},
- Input: payloads,
- }
-
- if params.TaskQueueName == "" {
- params.TaskQueueName = env.WorkflowInfo().TaskQueueName
- }
-
- return params
-}
-
-// ToDuration converts timer command to time.Duration.
-func (cmd NewTimer) ToDuration() time.Duration {
- return time.Millisecond * time.Duration(cmd.Milliseconds)
-}
-
-// returns command name (only for the commands sent to the worker)
-func commandName(cmd interface{}) (string, error) {
- const op = errors.Op("command_name")
- switch cmd.(type) {
- case GetWorkerInfo, *GetWorkerInfo:
- return getWorkerInfoCommand, nil
- case StartWorkflow, *StartWorkflow:
- return startWorkflowCommand, nil
- case InvokeSignal, *InvokeSignal:
- return invokeSignalCommand, nil
- case InvokeQuery, *InvokeQuery:
- return invokeQueryCommand, nil
- case DestroyWorkflow, *DestroyWorkflow:
- return destroyWorkflowCommand, nil
- case CancelWorkflow, *CancelWorkflow:
- return cancelWorkflowCommand, nil
- case GetStackTrace, *GetStackTrace:
- return getStackTraceCommand, nil
- case InvokeActivity, *InvokeActivity:
- return invokeActivityCommand, nil
- case ExecuteActivity, *ExecuteActivity:
- return executeActivityCommand, nil
- case ExecuteChildWorkflow, *ExecuteChildWorkflow:
- return executeChildWorkflowCommand, nil
- case GetChildWorkflowExecution, *GetChildWorkflowExecution:
- return getChildWorkflowExecutionCommand, nil
- case NewTimer, *NewTimer:
- return newTimerCommand, nil
- case GetVersion, *GetVersion:
- return getVersionCommand, nil
- case SideEffect, *SideEffect:
- return sideEffectCommand, nil
- case CompleteWorkflow, *CompleteWorkflow:
- return completeWorkflowCommand, nil
- case ContinueAsNew, *ContinueAsNew:
- return continueAsNewCommand, nil
- case SignalExternalWorkflow, *SignalExternalWorkflow:
- return signalExternalWorkflowCommand, nil
- case CancelExternalWorkflow, *CancelExternalWorkflow:
- return cancelExternalWorkflowCommand, nil
- case Cancel, *Cancel:
- return cancelCommand, nil
- case Panic, *Panic:
- return panicCommand, nil
- default:
- return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd))
- }
-}
-
-// reads command from binary payload
-func initCommand(name string) (interface{}, error) {
- const op = errors.Op("init_command")
- switch name {
- case getWorkerInfoCommand:
- return &GetWorkerInfo{}, nil
-
- case startWorkflowCommand:
- return &StartWorkflow{}, nil
-
- case invokeSignalCommand:
- return &InvokeSignal{}, nil
-
- case invokeQueryCommand:
- return &InvokeQuery{}, nil
-
- case destroyWorkflowCommand:
- return &DestroyWorkflow{}, nil
-
- case cancelWorkflowCommand:
- return &CancelWorkflow{}, nil
-
- case getStackTraceCommand:
- return &GetStackTrace{}, nil
-
- case invokeActivityCommand:
- return &InvokeActivity{}, nil
-
- case executeActivityCommand:
- return &ExecuteActivity{}, nil
-
- case executeChildWorkflowCommand:
- return &ExecuteChildWorkflow{}, nil
-
- case getChildWorkflowExecutionCommand:
- return &GetChildWorkflowExecution{}, nil
-
- case newTimerCommand:
- return &NewTimer{}, nil
-
- case getVersionCommand:
- return &GetVersion{}, nil
-
- case sideEffectCommand:
- return &SideEffect{}, nil
-
- case completeWorkflowCommand:
- return &CompleteWorkflow{}, nil
-
- case continueAsNewCommand:
- return &ContinueAsNew{}, nil
-
- case signalExternalWorkflowCommand:
- return &SignalExternalWorkflow{}, nil
-
- case cancelExternalWorkflowCommand:
- return &CancelExternalWorkflow{}, nil
-
- case cancelCommand:
- return &Cancel{}, nil
-
- case panicCommand:
- return &Panic{}, nil
-
- default:
- return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
- }
-}
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
deleted file mode 100644
index 607fe0fe..00000000
--- a/plugins/temporal/protocol/proto_codec.go
+++ /dev/null
@@ -1,145 +0,0 @@
-package protocol
-
-import (
- v1 "github.com/golang/protobuf/proto" //nolint:staticcheck
- jsoniter "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal"
- "google.golang.org/protobuf/proto"
-)
-
-type (
- // ProtoCodec uses protobuf to exchange messages with underlying workers.
- ProtoCodec struct {
- }
-)
-
-// NewProtoCodec creates new Proto communication codec.
-func NewProtoCodec() Codec {
- return &ProtoCodec{}
-}
-
-// WithLogger creates new codes instance with attached logger.
-func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec {
- return &ProtoCodec{}
-}
-
-// GetName returns codec name.
-func (c *ProtoCodec) GetName() string {
- return "protobuf"
-}
-
-// Execute exchanges commands with worker.
-func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
- if len(msg) == 0 {
- return nil, nil
- }
-
- var request = &internal.Frame{}
- var response = &internal.Frame{}
- var result = make([]Message, 0, 5)
- var err error
-
- for _, m := range msg {
- frame, err := c.packMessage(m)
- if err != nil {
- return nil, err
- }
-
- request.Messages = append(request.Messages, frame)
- }
-
- p := payload.Payload{}
-
- // context is always in json format
- if ctx.IsEmpty() {
- p.Context = []byte("null")
- }
-
- p.Context, err = jsoniter.Marshal(ctx)
- if err != nil {
- return nil, errors.E(errors.Op("encodeContext"), err)
- }
-
- p.Body, err = proto.Marshal(v1.MessageV2(request))
- if err != nil {
- return nil, errors.E(errors.Op("encodePayload"), err)
- }
-
- out, err := e.Exec(p)
- if err != nil {
- return nil, errors.E(errors.Op("execute"), err)
- }
-
- if len(out.Body) == 0 {
- // worker inactive or closed
- return nil, nil
- }
-
- err = proto.Unmarshal(out.Body, v1.MessageV2(response))
- if err != nil {
- return nil, errors.E(errors.Op("parseResponse"), err)
- }
-
- for _, f := range response.Messages {
- msg, err := c.parseMessage(f)
- if err != nil {
- return nil, err
- }
-
- result = append(result, msg)
- }
-
- return result, nil
-}
-
-func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
- var err error
-
- frame := &internal.Message{
- Id: msg.ID,
- Payloads: msg.Payloads,
- Failure: msg.Failure,
- }
-
- if msg.Command != nil {
- frame.Command, err = commandName(msg.Command)
- if err != nil {
- return nil, err
- }
-
- frame.Options, err = jsoniter.Marshal(msg.Command)
- if err != nil {
- return nil, err
- }
- }
-
- return frame, nil
-}
-
-func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
- const op = errors.Op("proto_codec_parse_message")
- var err error
-
- msg := Message{
- ID: frame.Id,
- Payloads: frame.Payloads,
- Failure: frame.Failure,
- }
-
- if frame.Command != "" {
- msg.Command, err = initCommand(frame.Command)
- if err != nil {
- return Message{}, errors.E(op, err)
- }
-
- err = jsoniter.Unmarshal(frame.Options, &msg.Command)
- if err != nil {
- return Message{}, errors.E(op, err)
- }
- }
-
- return msg, nil
-}
diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go
deleted file mode 100644
index 53076fdf..00000000
--- a/plugins/temporal/protocol/protocol.go
+++ /dev/null
@@ -1,77 +0,0 @@
-package protocol
-
-import (
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- commonpb "go.temporal.io/api/common/v1"
- "go.temporal.io/api/failure/v1"
-)
-
-const (
- // DebugNone disables all debug messages.
- DebugNone = iota
-
- // DebugNormal renders all messages into console.
- DebugNormal
-
- // DebugHumanized enables color highlights for messages.
- DebugHumanized
-)
-
-// Context provides worker information about currently. Context can be empty for server level commands.
-type Context struct {
- // TaskQueue associates message batch with the specific task queue in underlying worker.
- TaskQueue string `json:"taskQueue,omitempty"`
-
- // TickTime associated current or historical time with message batch.
- TickTime string `json:"tickTime,omitempty"`
-
- // Replay indicates that current message batch is historical.
- Replay bool `json:"replay,omitempty"`
-}
-
-// Message used to exchange the send commands and receive responses from underlying workers.
-type Message struct {
- // ID contains ID of the command, response or error.
- ID uint64 `json:"id"`
-
- // Command of the message in unmarshalled form. Pointer.
- Command interface{} `json:"command,omitempty"`
-
- // Failure associated with command id.
- Failure *failure.Failure `json:"failure,omitempty"`
-
- // Payloads contains message specific payloads in binary format.
- Payloads *commonpb.Payloads `json:"payloads,omitempty"`
-}
-
-// Codec manages payload encoding and decoding while communication with underlying worker.
-type Codec interface {
- // WithLogger creates new codes instance with attached logger.
- WithLogger(logger.Logger) Codec
-
- // GetName returns codec name.
- GetName() string
-
- // Execute sends message to worker and waits for the response.
- Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
-}
-
-// Endpoint provides the ability to send and receive messages.
-type Endpoint interface {
- // ExecWithContext allow to set ExecTTL
- Exec(p payload.Payload) (payload.Payload, error)
-}
-
-// DebugLevel configures debug level.
-type DebugLevel int
-
-// IsEmpty only check if task queue set.
-func (ctx Context) IsEmpty() bool {
- return ctx.TaskQueue == ""
-}
-
-// IsCommand returns true if message carries request.
-func (msg Message) IsCommand() bool {
- return msg.Command != nil
-}
diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go
deleted file mode 100644
index 58a0ae66..00000000
--- a/plugins/temporal/protocol/worker_info.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package protocol
-
-import (
- "github.com/spiral/errors"
- "go.temporal.io/sdk/converter"
- "go.temporal.io/sdk/worker"
-)
-
-// WorkerInfo outlines information about every available worker and it's TaskQueues.
-
-// WorkerInfo lists available task queues, workflows and activities.
-type WorkerInfo struct {
- // TaskQueue assigned to the worker.
- TaskQueue string `json:"taskQueue"`
-
- // Options describe worker options.
- Options worker.Options `json:"options,omitempty"`
-
- // Workflows provided by the worker.
- Workflows []WorkflowInfo
-
- // Activities provided by the worker.
- Activities []ActivityInfo
-}
-
-// WorkflowInfo describes single worker workflow.
-type WorkflowInfo struct {
- // Name of the workflow.
- Name string `json:"name"`
-
- // Queries pre-defined for the workflow type.
- Queries []string `json:"queries"`
-
- // Signals pre-defined for the workflow type.
- Signals []string `json:"signals"`
-}
-
-// ActivityInfo describes single worker activity.
-type ActivityInfo struct {
- // Name describes public activity name.
- Name string `json:"name"`
-}
-
-// FetchWorkerInfo fetches information about all underlying workers (can be multiplexed inside single process).
-func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerInfo, error) {
- const op = errors.Op("fetch_worker_info")
-
- result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}})
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if len(result) != 1 {
- return nil, errors.E(op, errors.Str("unable to read worker info"))
- }
-
- if result[0].ID != 0 {
- return nil, errors.E(op, errors.Str("FetchWorkerInfo confirmation missing"))
- }
-
- var info []WorkerInfo
- for i := range result[0].Payloads.Payloads {
- wi := WorkerInfo{}
- if err := dc.FromPayload(result[0].Payloads.Payloads[i], &wi); err != nil {
- return nil, errors.E(op, err)
- }
-
- info = append(info, wi)
- }
-
- return info, nil
-}