summaryrefslogtreecommitdiff
path: root/plugins/temporal/protocol
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-26 01:06:16 +0300
committerValery Piashchynski <[email protected]>2021-01-26 01:06:16 +0300
commit4638bdca80f75bc120b330022086d31c8b41be5b (patch)
tree2362cdb39dc2e793f5bec7fd9b8d2363f516c1d4 /plugins/temporal/protocol
parent7756eb25453c8006fbd75aa5c97159e96331b840 (diff)
Code cleanup
Diffstat (limited to 'plugins/temporal/protocol')
-rw-r--r--plugins/temporal/protocol/json_codec.go77
-rw-r--r--plugins/temporal/protocol/message.go9
-rw-r--r--plugins/temporal/protocol/proto_codec.go5
-rw-r--r--plugins/temporal/protocol/worker_info.go2
4 files changed, 47 insertions, 46 deletions
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
index dae3a7d0..e7a77068 100644
--- a/plugins/temporal/protocol/json_codec.go
+++ b/plugins/temporal/protocol/json_codec.go
@@ -2,7 +2,7 @@ package protocol
import (
"github.com/fatih/color"
- jsoniter "github.com/json-iterator/go"
+ j "github.com/json-iterator/go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -10,34 +10,34 @@ import (
"go.temporal.io/api/failure/v1"
)
-type (
- // JSONCodec can be used for debugging and log capturing reasons.
- JSONCodec struct {
- // level enables verbose logging or all incoming and outcoming messages.
- level DebugLevel
+var json = j.ConfigCompatibleWithStandardLibrary
- // logger renders messages when debug enabled.
- logger logger.Logger
- }
+// JSONCodec can be used for debugging and log capturing reasons.
+type JSONCodec struct {
+ // level enables verbose logging or all incoming and outcoming messages.
+ level DebugLevel
- // jsonFrame contains message command in binary form.
- jsonFrame struct {
- // ID contains ID of the command, response or error.
- ID uint64 `json:"id"`
+ // logger renders messages when debug enabled.
+ logger logger.Logger
+}
- // Command name. Optional.
- Command string `json:"command,omitempty"`
+// jsonFrame contains message command in binary form.
+type jsonFrame struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
- // Options to be unmarshalled to body (raw payload).
- Options jsoniter.RawMessage `json:"options,omitempty"`
+ // Command name. Optional.
+ Command string `json:"command,omitempty"`
- // Failure associated with command id.
- Failure []byte `json:"failure,omitempty"`
+ // Options to be unmarshalled to body (raw payload).
+ Options j.RawMessage `json:"options,omitempty"`
- // Payloads specific to the command or result.
- Payloads []byte `json:"payloads,omitempty"`
- }
-)
+ // Failure associated with command id.
+ Failure []byte `json:"failure,omitempty"`
+
+ // Payloads specific to the command or result.
+ Payloads []byte `json:"payloads,omitempty"`
+}
// NewJSONCodec creates new Json communication codec.
func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
@@ -62,21 +62,20 @@ func (c *JSONCodec) GetName() string {
// Execute exchanges commands with worker.
func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ const op = errors.Op("json_codec_execute")
if len(msg) == 0 {
return nil, nil
}
- var (
- response = make([]jsonFrame, 0, 5)
- result = make([]Message, 0, 5)
- err error
- )
+ var response = make([]jsonFrame, 0, 5)
+ var result = make([]Message, 0, 5)
+ var err error
frames := make([]jsonFrame, 0, len(msg))
for _, m := range msg {
frame, err := c.packFrame(m)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
frames = append(frames, frame)
@@ -88,14 +87,14 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message,
p.Context = []byte("null")
}
- p.Context, err = jsoniter.Marshal(ctx)
+ p.Context, err = json.Marshal(ctx)
if err != nil {
- return nil, errors.E(errors.Op("encodeContext"), err)
+ return nil, errors.E(op, err)
}
- p.Body, err = jsoniter.Marshal(frames)
+ p.Body, err = json.Marshal(frames)
if err != nil {
- return nil, errors.E(errors.Op("encodePayload"), err)
+ return nil, errors.E(op, err)
}
if c.level >= DebugNormal {
@@ -109,7 +108,7 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message,
out, err := e.Exec(p)
if err != nil {
- return nil, errors.E(errors.Op("execute"), err)
+ return nil, errors.E(op, err)
}
if len(out.Body) == 0 {
@@ -126,15 +125,15 @@ func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message,
c.logger.Debug(logMessage, "receive", true)
}
- err = jsoniter.Unmarshal(out.Body, &response)
+ err = json.Unmarshal(out.Body, &response)
if err != nil {
- return nil, errors.E(errors.Op("parseResponse"), err)
+ return nil, errors.E(op, err)
}
for _, f := range response {
msg, err := c.parseFrame(f)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
result = append(result, msg)
@@ -174,7 +173,7 @@ func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
return jsonFrame{}, err
}
- frame.Options, err = jsoniter.Marshal(msg.Command)
+ frame.Options, err = json.Marshal(msg.Command)
if err != nil {
return jsonFrame{}, err
}
@@ -214,7 +213,7 @@ func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
return Message{}, err
}
- err = jsoniter.Unmarshal(frame.Options, &cmd)
+ err = json.Unmarshal(frame.Options, &cmd)
if err != nil {
return Message{}, err
}
diff --git a/plugins/temporal/protocol/message.go b/plugins/temporal/protocol/message.go
index 4568fd1d..d5e0f49d 100644
--- a/plugins/temporal/protocol/message.go
+++ b/plugins/temporal/protocol/message.go
@@ -39,8 +39,7 @@ const (
)
// GetWorkerInfo reads worker information.
-type GetWorkerInfo struct {
-}
+type GetWorkerInfo struct{}
// InvokeActivity invokes activity.
type InvokeActivity struct {
@@ -218,6 +217,7 @@ func (cmd NewTimer) ToDuration() time.Duration {
// returns command name (only for the commands sent to the worker)
func commandName(cmd interface{}) (string, error) {
+ const op = errors.Op("command_name")
switch cmd.(type) {
case GetWorkerInfo, *GetWorkerInfo:
return getWorkerInfoCommand, nil
@@ -260,12 +260,13 @@ func commandName(cmd interface{}) (string, error) {
case Panic, *Panic:
return panicCommand, nil
default:
- return "", errors.E(errors.Op("commandName"), "undefined command type", cmd)
+ return "", errors.E(op, errors.Errorf("undefined command type: %s", cmd))
}
}
// reads command from binary payload
func initCommand(name string) (interface{}, error) {
+ const op = errors.Op("init_command")
switch name {
case getWorkerInfoCommand:
return &GetWorkerInfo{}, nil
@@ -328,6 +329,6 @@ func initCommand(name string) (interface{}, error) {
return &Panic{}, nil
default:
- return nil, errors.E(errors.Op("initCommand"), "undefined command type", name)
+ return nil, errors.E(op, errors.Errorf("undefined command name: %s", name))
}
}
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
index b41f02b6..607fe0fe 100644
--- a/plugins/temporal/protocol/proto_codec.go
+++ b/plugins/temporal/protocol/proto_codec.go
@@ -120,6 +120,7 @@ func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
}
func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
+ const op = errors.Op("proto_codec_parse_message")
var err error
msg := Message{
@@ -131,12 +132,12 @@ func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
if frame.Command != "" {
msg.Command, err = initCommand(frame.Command)
if err != nil {
- return Message{}, err
+ return Message{}, errors.E(op, err)
}
err = jsoniter.Unmarshal(frame.Options, &msg.Command)
if err != nil {
- return Message{}, err
+ return Message{}, errors.E(op, err)
}
}
diff --git a/plugins/temporal/protocol/worker_info.go b/plugins/temporal/protocol/worker_info.go
index 6dfcd81f..58a0ae66 100644
--- a/plugins/temporal/protocol/worker_info.go
+++ b/plugins/temporal/protocol/worker_info.go
@@ -47,7 +47,7 @@ func FetchWorkerInfo(c Codec, e Endpoint, dc converter.DataConverter) ([]WorkerI
result, err := c.Execute(e, Context{}, Message{ID: 0, Command: GetWorkerInfo{}})
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
if len(result) != 1 {