summaryrefslogtreecommitdiff
path: root/plugins/temporal/protocol/json_codec.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/protocol/json_codec.go')
-rw-r--r--plugins/temporal/protocol/json_codec.go225
1 files changed, 225 insertions, 0 deletions
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go
new file mode 100644
index 00000000..e7a77068
--- /dev/null
+++ b/plugins/temporal/protocol/json_codec.go
@@ -0,0 +1,225 @@
+package protocol
+
+import (
+ "github.com/fatih/color"
+ j "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "go.temporal.io/api/common/v1"
+ "go.temporal.io/api/failure/v1"
+)
+
+var json = j.ConfigCompatibleWithStandardLibrary
+
+// JSONCodec can be used for debugging and log capturing reasons.
+type JSONCodec struct {
+ // level enables verbose logging or all incoming and outcoming messages.
+ level DebugLevel
+
+ // logger renders messages when debug enabled.
+ logger logger.Logger
+}
+
+// jsonFrame contains message command in binary form.
+type jsonFrame struct {
+ // ID contains ID of the command, response or error.
+ ID uint64 `json:"id"`
+
+ // Command name. Optional.
+ Command string `json:"command,omitempty"`
+
+ // Options to be unmarshalled to body (raw payload).
+ Options j.RawMessage `json:"options,omitempty"`
+
+ // Failure associated with command id.
+ Failure []byte `json:"failure,omitempty"`
+
+ // Payloads specific to the command or result.
+ Payloads []byte `json:"payloads,omitempty"`
+}
+
+// NewJSONCodec creates new Json communication codec.
+func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec {
+ return &JSONCodec{
+ level: level,
+ logger: logger,
+ }
+}
+
+// WithLogger creates new codes instance with attached logger.
+func (c *JSONCodec) WithLogger(logger logger.Logger) Codec {
+ return &JSONCodec{
+ level: c.level,
+ logger: logger,
+ }
+}
+
+// GetName returns codec name.
+func (c *JSONCodec) GetName() string {
+ return "json"
+}
+
+// Execute exchanges commands with worker.
+func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
+ const op = errors.Op("json_codec_execute")
+ if len(msg) == 0 {
+ return nil, nil
+ }
+
+ var response = make([]jsonFrame, 0, 5)
+ var result = make([]Message, 0, 5)
+ var err error
+
+ frames := make([]jsonFrame, 0, len(msg))
+ for _, m := range msg {
+ frame, err := c.packFrame(m)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ frames = append(frames, frame)
+ }
+
+ p := payload.Payload{}
+
+ if ctx.IsEmpty() {
+ p.Context = []byte("null")
+ }
+
+ p.Context, err = json.Marshal(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.Body, err = json.Marshal(frames)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if c.level >= DebugNormal {
+ logMessage := string(p.Body) + " " + string(p.Context)
+ if c.level >= DebugHumanized {
+ logMessage = color.GreenString(logMessage)
+ }
+
+ c.logger.Debug(logMessage)
+ }
+
+ out, err := e.Exec(p)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if len(out.Body) == 0 {
+ // worker inactive or closed
+ return nil, nil
+ }
+
+ if c.level >= DebugNormal {
+ logMessage := string(out.Body)
+ if c.level >= DebugHumanized {
+ logMessage = color.HiYellowString(logMessage)
+ }
+
+ c.logger.Debug(logMessage, "receive", true)
+ }
+
+ err = json.Unmarshal(out.Body, &response)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ for _, f := range response {
+ msg, err := c.parseFrame(f)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ result = append(result, msg)
+ }
+
+ return result, nil
+}
+
+func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) {
+ var (
+ err error
+ frame jsonFrame
+ )
+
+ frame.ID = msg.ID
+
+ if msg.Payloads != nil {
+ frame.Payloads, err = msg.Payloads.Marshal()
+ if err != nil {
+ return jsonFrame{}, err
+ }
+ }
+
+ if msg.Failure != nil {
+ frame.Failure, err = msg.Failure.Marshal()
+ if err != nil {
+ return jsonFrame{}, err
+ }
+ }
+
+ if msg.Command == nil {
+ return frame, nil
+ }
+
+ frame.Command, err = commandName(msg.Command)
+ if err != nil {
+ return jsonFrame{}, err
+ }
+
+ frame.Options, err = json.Marshal(msg.Command)
+ if err != nil {
+ return jsonFrame{}, err
+ }
+
+ return frame, nil
+}
+
+func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) {
+ var (
+ err error
+ msg Message
+ )
+
+ msg.ID = frame.ID
+
+ if frame.Payloads != nil {
+ msg.Payloads = &common.Payloads{}
+
+ err = msg.Payloads.Unmarshal(frame.Payloads)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ if frame.Failure != nil {
+ msg.Failure = &failure.Failure{}
+
+ err = msg.Failure.Unmarshal(frame.Failure)
+ if err != nil {
+ return Message{}, err
+ }
+ }
+
+ if frame.Command != "" {
+ cmd, err := initCommand(frame.Command)
+ if err != nil {
+ return Message{}, err
+ }
+
+ err = json.Unmarshal(frame.Options, &cmd)
+ if err != nil {
+ return Message{}, err
+ }
+
+ msg.Command = cmd
+ }
+
+ return msg, nil
+}