summaryrefslogtreecommitdiff
path: root/plugins/temporal/protocol/protocol.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-25 22:47:02 +0300
committerValery Piashchynski <[email protected]>2021-01-25 22:47:02 +0300
commit43071e43a0743ff8c7913bba7819952962124355 (patch)
treee3b61113d3c0d28f972c71592af8b2f708994167 /plugins/temporal/protocol/protocol.go
parent5fd1168c687040ca7d72f4727ee1aec753d3f258 (diff)
Initial commit of the Temporal plugins set
Diffstat (limited to 'plugins/temporal/protocol/protocol.go')
-rw-r--r--plugins/temporal/protocol/protocol.go77
1 files changed, 77 insertions, 0 deletions
diff --git a/plugins/temporal/protocol/protocol.go b/plugins/temporal/protocol/protocol.go
new file mode 100644
index 00000000..53076fdf
--- /dev/null
+++ b/plugins/temporal/protocol/protocol.go
@@ -0,0 +1,77 @@
+package protocol
+
+import (
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ commonpb "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+const (
+ // DebugNone disables all debug messages.
+ DebugNone = iota
+
+ // DebugNormal renders all messages into console.
+ DebugNormal
+
+ // DebugHumanized enables color highlights for messages.
+ DebugHumanized
+)
+
+// Context provides worker information about currently. Context can be empty for server level commands.
+type Context struct {
+ // TaskQueue associates message batch with the specific task queue in underlying worker.
+ TaskQueue string `json:"taskQueue,omitempty"`
+
+ // TickTime associated current or historical time with message batch.
+ TickTime string `json:"tickTime,omitempty"`
+
+ // Replay indicates that current message batch is historical.
+ Replay bool `json:"replay,omitempty"`
+}
+
+// Message used to exchange the send commands and receive responses from underlying workers.
+type Message struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
+
+ // Command of the message in unmarshalled form. Pointer.
+ Command interface{} `json:"command,omitempty"`
+
+ // Failure associated with command id.
+ Failure *failure.Failure `json:"failure,omitempty"`
+
+ // Payloads contains message specific payloads in binary format.
+ Payloads *commonpb.Payloads `json:"payloads,omitempty"`
+}
+
+// Codec manages payload encoding and decoding while communication with underlying worker.
+type Codec interface {
+ // WithLogger creates new codes instance with attached logger.
+ WithLogger(logger.Logger) Codec
+
+ // GetName returns codec name.
+ GetName() string
+
+ // Execute sends message to worker and waits for the response.
+ Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
+}
+
+// Endpoint provides the ability to send and receive messages.
+type Endpoint interface {
+ // ExecWithContext allow to set ExecTTL
+ Exec(p payload.Payload) (payload.Payload, error)
+}
+
+// DebugLevel configures debug level.
+type DebugLevel int
+
+// IsEmpty only check if task queue set.
+func (ctx Context) IsEmpty() bool {
+ return ctx.TaskQueue == ""
+}
+
+// IsCommand returns true if message carries request.
+func (msg Message) IsCommand() bool {
+ return msg.Command != nil
+}