summaryrefslogtreecommitdiff
path: root/plugins/jobs/protocol.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r--plugins/jobs/protocol.go67
1 files changed, 67 insertions, 0 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
new file mode 100644
index 00000000..adfd0784
--- /dev/null
+++ b/plugins/jobs/protocol.go
@@ -0,0 +1,67 @@
+package jobs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ pq "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Type uint32
+
+const (
+ Error Type = iota
+ NoError
+)
+
+// internal worker protocol (jobs mode)
+type protocol struct {
+ // message type, see Type
+ T Type `json:"type"`
+ // Payload
+ Data []byte `json:"data"`
+}
+
+type errorResp struct {
+ Msg string `json:"message"`
+ Requeue bool `json:"requeue"`
+ Delay uint32 `json:"delay_seconds"`
+}
+
+func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
+ const op = errors.Op("jobs_handle_response")
+ // TODO(rustatian) to sync.Pool
+ p := &protocol{}
+ err := json.Unmarshal(resp, p)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ switch p.T {
+ case Error:
+ // TODO(rustatian) to sync.Pool
+ er := &errorResp{}
+ err = json.Unmarshal(p.Data, er)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
+
+ if er.Requeue {
+ err = jb.Requeue(er.Delay)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ case NoError:
+ err = jb.Ack()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}