diff options
Diffstat (limited to 'plugins/temporal/protocol/json_codec.go')
-rw-r--r-- | plugins/temporal/protocol/json_codec.go | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go new file mode 100644 index 00000000..e7a77068 --- /dev/null +++ b/plugins/temporal/protocol/json_codec.go @@ -0,0 +1,225 @@ +package protocol + +import ( + "github.com/fatih/color" + j "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/logger" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/failure/v1" +) + +var json = j.ConfigCompatibleWithStandardLibrary + +// JSONCodec can be used for debugging and log capturing reasons. +type JSONCodec struct { + // level enables verbose logging or all incoming and outcoming messages. + level DebugLevel + + // logger renders messages when debug enabled. + logger logger.Logger +} + +// jsonFrame contains message command in binary form. +type jsonFrame struct { + // ID contains ID of the command, response or error. + ID uint64 `json:"id"` + + // Command name. Optional. + Command string `json:"command,omitempty"` + + // Options to be unmarshalled to body (raw payload). + Options j.RawMessage `json:"options,omitempty"` + + // Failure associated with command id. + Failure []byte `json:"failure,omitempty"` + + // Payloads specific to the command or result. + Payloads []byte `json:"payloads,omitempty"` +} + +// NewJSONCodec creates new Json communication codec. +func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { + return &JSONCodec{ + level: level, + logger: logger, + } +} + +// WithLogger creates new codes instance with attached logger. +func (c *JSONCodec) WithLogger(logger logger.Logger) Codec { + return &JSONCodec{ + level: c.level, + logger: logger, + } +} + +// GetName returns codec name. +func (c *JSONCodec) GetName() string { + return "json" +} + +// Execute exchanges commands with worker. +func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { + const op = errors.Op("json_codec_execute") + if len(msg) == 0 { + return nil, nil + } + + var response = make([]jsonFrame, 0, 5) + var result = make([]Message, 0, 5) + var err error + + frames := make([]jsonFrame, 0, len(msg)) + for _, m := range msg { + frame, err := c.packFrame(m) + if err != nil { + return nil, errors.E(op, err) + } + + frames = append(frames, frame) + } + + p := payload.Payload{} + + if ctx.IsEmpty() { + p.Context = []byte("null") + } + + p.Context, err = json.Marshal(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + p.Body, err = json.Marshal(frames) + if err != nil { + return nil, errors.E(op, err) + } + + if c.level >= DebugNormal { + logMessage := string(p.Body) + " " + string(p.Context) + if c.level >= DebugHumanized { + logMessage = color.GreenString(logMessage) + } + + c.logger.Debug(logMessage) + } + + out, err := e.Exec(p) + if err != nil { + return nil, errors.E(op, err) + } + + if len(out.Body) == 0 { + // worker inactive or closed + return nil, nil + } + + if c.level >= DebugNormal { + logMessage := string(out.Body) + if c.level >= DebugHumanized { + logMessage = color.HiYellowString(logMessage) + } + + c.logger.Debug(logMessage, "receive", true) + } + + err = json.Unmarshal(out.Body, &response) + if err != nil { + return nil, errors.E(op, err) + } + + for _, f := range response { + msg, err := c.parseFrame(f) + if err != nil { + return nil, errors.E(op, err) + } + + result = append(result, msg) + } + + return result, nil +} + +func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { + var ( + err error + frame jsonFrame + ) + + frame.ID = msg.ID + + if msg.Payloads != nil { + frame.Payloads, err = msg.Payloads.Marshal() + if err != nil { + return jsonFrame{}, err + } + } + + if msg.Failure != nil { + frame.Failure, err = msg.Failure.Marshal() + if err != nil { + return jsonFrame{}, err + } + } + + if msg.Command == nil { + return frame, nil + } + + frame.Command, err = commandName(msg.Command) + if err != nil { + return jsonFrame{}, err + } + + frame.Options, err = json.Marshal(msg.Command) + if err != nil { + return jsonFrame{}, err + } + + return frame, nil +} + +func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { + var ( + err error + msg Message + ) + + msg.ID = frame.ID + + if frame.Payloads != nil { + msg.Payloads = &common.Payloads{} + + err = msg.Payloads.Unmarshal(frame.Payloads) + if err != nil { + return Message{}, err + } + } + + if frame.Failure != nil { + msg.Failure = &failure.Failure{} + + err = msg.Failure.Unmarshal(frame.Failure) + if err != nil { + return Message{}, err + } + } + + if frame.Command != "" { + cmd, err := initCommand(frame.Command) + if err != nil { + return Message{}, err + } + + err = json.Unmarshal(frame.Options, &cmd) + if err != nil { + return Message{}, err + } + + msg.Command = cmd + } + + return msg, nil +} |