summaryrefslogtreecommitdiff
path: root/plugins/jobs/protocol.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r--plugins/jobs/protocol.go78
1 files changed, 0 insertions, 78 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
deleted file mode 100644
index 9d769fdf..00000000
--- a/plugins/jobs/protocol.go
+++ /dev/null
@@ -1,78 +0,0 @@
-package jobs
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/errors"
- pq "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type Type uint32
-
-const (
- NoError Type = iota
- Error
-)
-
-// internal worker protocol (jobs mode)
-type protocol struct {
- // message type, see Type
- T Type `json:"type"`
- // Payload
- Data json.RawMessage `json:"data"`
-}
-
-type errorResp struct {
- Msg string `json:"message"`
- Requeue bool `json:"requeue"`
- Delay int64 `json:"delay_seconds"`
- Headers map[string][]string `json:"headers"`
-}
-
-func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
- const op = errors.Op("jobs_handle_response")
- // TODO(rustatian) to sync.Pool
- p := &protocol{}
-
- err := json.Unmarshal(resp, p)
- if err != nil {
- return errors.E(op, err)
- }
-
- switch p.T {
- // likely case
- case NoError:
- err = jb.Ack()
- if err != nil {
- return errors.E(op, err)
- }
- case Error:
- // TODO(rustatian) to sync.Pool
- er := &errorResp{}
-
- err = json.Unmarshal(p.Data, er)
- if err != nil {
- return errors.E(op, err)
- }
-
- log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
-
- if er.Requeue {
- err = jb.Requeue(er.Headers, er.Delay)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
-
- return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg))
-
- default:
- err = jb.Ack()
- if err != nil {
- return errors.E(op, err)
- }
- }
-
- return nil
-}