blob: 53076fdf219753af1f634245b9f49482301f23f0 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
package protocol
import (
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/failure/v1"
)
const (
// DebugNone disables all debug messages.
DebugNone = iota
// DebugNormal renders all messages into console.
DebugNormal
// DebugHumanized enables color highlights for messages.
DebugHumanized
)
// Context provides worker information about currently. Context can be empty for server level commands.
type Context struct {
// TaskQueue associates message batch with the specific task queue in underlying worker.
TaskQueue string `json:"taskQueue,omitempty"`
// TickTime associated current or historical time with message batch.
TickTime string `json:"tickTime,omitempty"`
// Replay indicates that current message batch is historical.
Replay bool `json:"replay,omitempty"`
}
// Message used to exchange the send commands and receive responses from underlying workers.
type Message struct {
// ID contains ID of the command, response or error.
ID uint64 `json:"id"`
// Command of the message in unmarshalled form. Pointer.
Command interface{} `json:"command,omitempty"`
// Failure associated with command id.
Failure *failure.Failure `json:"failure,omitempty"`
// Payloads contains message specific payloads in binary format.
Payloads *commonpb.Payloads `json:"payloads,omitempty"`
}
// Codec manages payload encoding and decoding while communication with underlying worker.
type Codec interface {
// WithLogger creates new codes instance with attached logger.
WithLogger(logger.Logger) Codec
// GetName returns codec name.
GetName() string
// Execute sends message to worker and waits for the response.
Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error)
}
// Endpoint provides the ability to send and receive messages.
type Endpoint interface {
// ExecWithContext allow to set ExecTTL
Exec(p payload.Payload) (payload.Payload, error)
}
// DebugLevel configures debug level.
type DebugLevel int
// IsEmpty only check if task queue set.
func (ctx Context) IsEmpty() bool {
return ctx.TaskQueue == ""
}
// IsCommand returns true if message carries request.
func (msg Message) IsCommand() bool {
return msg.Command != nil
}
|