summaryrefslogtreecommitdiff
path: root/plugins/temporal/protocol/proto_codec.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/temporal/protocol/proto_codec.go')
-rw-r--r--plugins/temporal/protocol/proto_codec.go145
1 files changed, 0 insertions, 145 deletions
diff --git a/plugins/temporal/protocol/proto_codec.go b/plugins/temporal/protocol/proto_codec.go
deleted file mode 100644
index 607fe0fe..00000000
--- a/plugins/temporal/protocol/proto_codec.go
+++ /dev/null
@@ -1,145 +0,0 @@
-package protocol
-
-import (
- v1 "github.com/golang/protobuf/proto" //nolint:staticcheck
- jsoniter "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/temporal/protocol/internal"
- "google.golang.org/protobuf/proto"
-)
-
-type (
- // ProtoCodec uses protobuf to exchange messages with underlying workers.
- ProtoCodec struct {
- }
-)
-
-// NewProtoCodec creates new Proto communication codec.
-func NewProtoCodec() Codec {
- return &ProtoCodec{}
-}
-
-// WithLogger creates new codes instance with attached logger.
-func (c *ProtoCodec) WithLogger(logger logger.Logger) Codec {
- return &ProtoCodec{}
-}
-
-// GetName returns codec name.
-func (c *ProtoCodec) GetName() string {
- return "protobuf"
-}
-
-// Execute exchanges commands with worker.
-func (c *ProtoCodec) Execute(e Endpoint, ctx Context, msg ...Message) ([]Message, error) {
- if len(msg) == 0 {
- return nil, nil
- }
-
- var request = &internal.Frame{}
- var response = &internal.Frame{}
- var result = make([]Message, 0, 5)
- var err error
-
- for _, m := range msg {
- frame, err := c.packMessage(m)
- if err != nil {
- return nil, err
- }
-
- request.Messages = append(request.Messages, frame)
- }
-
- p := payload.Payload{}
-
- // context is always in json format
- if ctx.IsEmpty() {
- p.Context = []byte("null")
- }
-
- p.Context, err = jsoniter.Marshal(ctx)
- if err != nil {
- return nil, errors.E(errors.Op("encodeContext"), err)
- }
-
- p.Body, err = proto.Marshal(v1.MessageV2(request))
- if err != nil {
- return nil, errors.E(errors.Op("encodePayload"), err)
- }
-
- out, err := e.Exec(p)
- if err != nil {
- return nil, errors.E(errors.Op("execute"), err)
- }
-
- if len(out.Body) == 0 {
- // worker inactive or closed
- return nil, nil
- }
-
- err = proto.Unmarshal(out.Body, v1.MessageV2(response))
- if err != nil {
- return nil, errors.E(errors.Op("parseResponse"), err)
- }
-
- for _, f := range response.Messages {
- msg, err := c.parseMessage(f)
- if err != nil {
- return nil, err
- }
-
- result = append(result, msg)
- }
-
- return result, nil
-}
-
-func (c *ProtoCodec) packMessage(msg Message) (*internal.Message, error) {
- var err error
-
- frame := &internal.Message{
- Id: msg.ID,
- Payloads: msg.Payloads,
- Failure: msg.Failure,
- }
-
- if msg.Command != nil {
- frame.Command, err = commandName(msg.Command)
- if err != nil {
- return nil, err
- }
-
- frame.Options, err = jsoniter.Marshal(msg.Command)
- if err != nil {
- return nil, err
- }
- }
-
- return frame, nil
-}
-
-func (c *ProtoCodec) parseMessage(frame *internal.Message) (Message, error) {
- const op = errors.Op("proto_codec_parse_message")
- var err error
-
- msg := Message{
- ID: frame.Id,
- Payloads: frame.Payloads,
- Failure: frame.Failure,
- }
-
- if frame.Command != "" {
- msg.Command, err = initCommand(frame.Command)
- if err != nil {
- return Message{}, errors.E(op, err)
- }
-
- err = jsoniter.Unmarshal(frame.Options, &msg.Command)
- if err != nil {
- return Message{}, errors.E(op, err)
- }
- }
-
- return msg, nil
-}