summaryrefslogtreecommitdiff
path: root/plugins/temporal/protocol/protocol.go
blob: 53076fdf219753af1f634245b9f49482301f23f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package protocol

import (
	"github.com/spiral/roadrunner/v2/pkg/payload"
	"github.com/spiral/roadrunner/v2/plugins/logger"
	commonpb "go.temporal.io/api/common/v1"
	"go.temporal.io/api/failure/v1"
)

const (
	// DebugNone disables all debug messages.
	DebugNone = iota

	// DebugNormal renders all messages into console.
	DebugNormal

	// DebugHumanized enables color highlights for messages.
	DebugHumanized
)

// Context provides worker information about currently. Context can be empty for server level commands.
type Context struct {
	// TaskQueue associates message batch with the specific task queue in underlying worker.
	TaskQueue string `json:"taskQueue,omitempty"`

	// TickTime associated current or historical time with message batch.
	TickTime string `json:"tickTime,omitempty"`

	// Replay indicates that current message batch is historical.
	Replay bool `json:"replay,omitempty"`
}

// Message used to exchange the send commands and receive responses from underlying workers.
type Message struct {
	// ID contains ID of the command, response or error.
	ID uint64 `json:"id"`

	// Command of the message in unmarshalled form. Pointer.
	Command interface{} `json:"command,omitempty"`

	// Failure associated with command id.
	Failure *failure.Failure `json:"failure,omitempty"`

	// Payloads contains message specific payloads in binary format.
	Payloads *commonpb.Payloads `json:"payloads,omitempty"`
}

// Codec manages payload encoding and decoding while communication with underlying worker.
type Codec interface {
	// WithLogger creates new codes instance with attached logger.
	WithLogger(logger.Logger) Codec

	// GetName returns codec name.
	GetName() string

	// Execute sends message to worker and waits for the response.
	Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
}

// Endpoint provides the ability to send and receive messages.
type Endpoint interface {
	// ExecWithContext allow to set ExecTTL
	Exec(p payload.Payload) (payload.Payload, error)
}

// DebugLevel configures debug level.
type DebugLevel int

// IsEmpty only check if task queue set.
func (ctx Context) IsEmpty() bool {
	return ctx.TaskQueue == ""
}

// IsCommand returns true if message carries request.
func (msg Message) IsCommand() bool {
	return msg.Command != nil
}