diff options
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r-- | plugins/jobs/protocol.go | 78 |
1 files changed, 0 insertions, 78 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go deleted file mode 100644 index 9d769fdf..00000000 --- a/plugins/jobs/protocol.go +++ /dev/null @@ -1,78 +0,0 @@ -package jobs - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/errors" - pq "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type Type uint32 - -const ( - NoError Type = iota - Error -) - -// internal worker protocol (jobs mode) -type protocol struct { - // message type, see Type - T Type `json:"type"` - // Payload - Data json.RawMessage `json:"data"` -} - -type errorResp struct { - Msg string `json:"message"` - Requeue bool `json:"requeue"` - Delay int64 `json:"delay_seconds"` - Headers map[string][]string `json:"headers"` -} - -func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { - const op = errors.Op("jobs_handle_response") - // TODO(rustatian) to sync.Pool - p := &protocol{} - - err := json.Unmarshal(resp, p) - if err != nil { - return errors.E(op, err) - } - - switch p.T { - // likely case - case NoError: - err = jb.Ack() - if err != nil { - return errors.E(op, err) - } - case Error: - // TODO(rustatian) to sync.Pool - er := &errorResp{} - - err = json.Unmarshal(p.Data, er) - if err != nil { - return errors.E(op, err) - } - - log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) - - if er.Requeue { - err = jb.Requeue(er.Headers, er.Delay) - if err != nil { - return errors.E(op, err) - } - return nil - } - - return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg)) - - default: - err = jb.Ack() - if err != nil { - return errors.E(op, err) - } - } - - return nil -} |