diff options
Diffstat (limited to 'plugins/temporal/protocol/json_codec.go')
-rw-r--r-- | plugins/temporal/protocol/json_codec.go | 225 |
1 files changed, 0 insertions, 225 deletions
diff --git a/plugins/temporal/protocol/json_codec.go b/plugins/temporal/protocol/json_codec.go deleted file mode 100644 index e7a77068..00000000 --- a/plugins/temporal/protocol/json_codec.go +++ /dev/null @@ -1,225 +0,0 @@ -package protocol - -import ( - "github.com/fatih/color" - j "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/logger" - "go.temporal.io/api/common/v1" - "go.temporal.io/api/failure/v1" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -// JSONCodec can be used for debugging and log capturing reasons. -type JSONCodec struct { - // level enables verbose logging or all incoming and outcoming messages. - level DebugLevel - - // logger renders messages when debug enabled. - logger logger.Logger -} - -// jsonFrame contains message command in binary form. -type jsonFrame struct { - // ID contains ID of the command, response or error. - ID uint64 `json:"id"` - - // Command name. Optional. - Command string `json:"command,omitempty"` - - // Options to be unmarshalled to body (raw payload). - Options j.RawMessage `json:"options,omitempty"` - - // Failure associated with command id. - Failure []byte `json:"failure,omitempty"` - - // Payloads specific to the command or result. - Payloads []byte `json:"payloads,omitempty"` -} - -// NewJSONCodec creates new Json communication codec. -func NewJSONCodec(level DebugLevel, logger logger.Logger) Codec { - return &JSONCodec{ - level: level, - logger: logger, - } -} - -// WithLogger creates new codes instance with attached logger. -func (c *JSONCodec) WithLogger(logger logger.Logger) Codec { - return &JSONCodec{ - level: c.level, - logger: logger, - } -} - -// GetName returns codec name. -func (c *JSONCodec) GetName() string { - return "json" -} - -// Execute exchanges commands with worker. -func (c *JSONCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) { - const op = errors.Op("json_codec_execute") - if len(msg) == 0 { - return nil, nil - } - - var response = make([]jsonFrame, 0, 5) - var result = make([]Message, 0, 5) - var err error - - frames := make([]jsonFrame, 0, len(msg)) - for _, m := range msg { - frame, err := c.packFrame(m) - if err != nil { - return nil, errors.E(op, err) - } - - frames = append(frames, frame) - } - - p := payload.Payload{} - - if ctx.IsEmpty() { - p.Context = []byte("null") - } - - p.Context, err = json.Marshal(ctx) - if err != nil { - return nil, errors.E(op, err) - } - - p.Body, err = json.Marshal(frames) - if err != nil { - return nil, errors.E(op, err) - } - - if c.level >= DebugNormal { - logMessage := string(p.Body) + " " + string(p.Context) - if c.level >= DebugHumanized { - logMessage = color.GreenString(logMessage) - } - - c.logger.Debug(logMessage) - } - - out, err := e.Exec(p) - if err != nil { - return nil, errors.E(op, err) - } - - if len(out.Body) == 0 { - // worker inactive or closed - return nil, nil - } - - if c.level >= DebugNormal { - logMessage := string(out.Body) - if c.level >= DebugHumanized { - logMessage = color.HiYellowString(logMessage) - } - - c.logger.Debug(logMessage, "receive", true) - } - - err = json.Unmarshal(out.Body, &response) - if err != nil { - return nil, errors.E(op, err) - } - - for _, f := range response { - msg, err := c.parseFrame(f) - if err != nil { - return nil, errors.E(op, err) - } - - result = append(result, msg) - } - - return result, nil -} - -func (c *JSONCodec) packFrame(msg Message) (jsonFrame, error) { - var ( - err error - frame jsonFrame - ) - - frame.ID = msg.ID - - if msg.Payloads != nil { - frame.Payloads, err = msg.Payloads.Marshal() - if err != nil { - return jsonFrame{}, err - } - } - - if msg.Failure != nil { - frame.Failure, err = msg.Failure.Marshal() - if err != nil { - return jsonFrame{}, err - } - } - - if msg.Command == nil { - return frame, nil - } - - frame.Command, err = commandName(msg.Command) - if err != nil { - return jsonFrame{}, err - } - - frame.Options, err = json.Marshal(msg.Command) - if err != nil { - return jsonFrame{}, err - } - - return frame, nil -} - -func (c *JSONCodec) parseFrame(frame jsonFrame) (Message, error) { - var ( - err error - msg Message - ) - - msg.ID = frame.ID - - if frame.Payloads != nil { - msg.Payloads = &common.Payloads{} - - err = msg.Payloads.Unmarshal(frame.Payloads) - if err != nil { - return Message{}, err - } - } - - if frame.Failure != nil { - msg.Failure = &failure.Failure{} - - err = msg.Failure.Unmarshal(frame.Failure) - if err != nil { - return Message{}, err - } - } - - if frame.Command != "" { - cmd, err := initCommand(frame.Command) - if err != nil { - return Message{}, err - } - - err = json.Unmarshal(frame.Options, &cmd) - if err != nil { - return Message{}, err - } - - msg.Command = cmd - } - - return msg, nil -} |