diff options
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r-- | plugins/jobs/protocol.go | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go new file mode 100644 index 00000000..adfd0784 --- /dev/null +++ b/plugins/jobs/protocol.go @@ -0,0 +1,67 @@ +package jobs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/errors" + pq "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type Type uint32 + +const ( + Error Type = iota + NoError +) + +// internal worker protocol (jobs mode) +type protocol struct { + // message type, see Type + T Type `json:"type"` + // Payload + Data []byte `json:"data"` +} + +type errorResp struct { + Msg string `json:"message"` + Requeue bool `json:"requeue"` + Delay uint32 `json:"delay_seconds"` +} + +func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { + const op = errors.Op("jobs_handle_response") + // TODO(rustatian) to sync.Pool + p := &protocol{} + err := json.Unmarshal(resp, p) + if err != nil { + return errors.E(op, err) + } + + switch p.T { + case Error: + // TODO(rustatian) to sync.Pool + er := &errorResp{} + err = json.Unmarshal(p.Data, er) + if err != nil { + return errors.E(op, err) + } + + log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) + + if er.Requeue { + err = jb.Requeue(er.Delay) + if err != nil { + return errors.E(op, err) + } + return nil + } + + case NoError: + err = jb.Ack() + if err != nil { + return errors.E(op, err) + } + } + + return nil +} |