summaryrefslogtreecommitdiff
path: root/plugins/jobs/protocol.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r--plugins/jobs/protocol.go11
1 files changed, 7 insertions, 4 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
index 691369d0..9d769fdf 100644
--- a/plugins/jobs/protocol.go
+++ b/plugins/jobs/protocol.go
@@ -10,8 +10,8 @@ import (
type Type uint32
const (
- Error Type = iota
- NoError
+ NoError Type = iota
+ Error
)
// internal worker protocol (jobs mode)
@@ -19,7 +19,7 @@ type protocol struct {
// message type, see Type
T Type `json:"type"`
// Payload
- Data []byte `json:"data"`
+ Data json.RawMessage `json:"data"`
}
type errorResp struct {
@@ -55,7 +55,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
return errors.E(op, err)
}
- log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
+ log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
if er.Requeue {
err = jb.Requeue(er.Headers, er.Delay)
@@ -64,6 +64,9 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
}
return nil
}
+
+ return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg))
+
default:
err = jb.Ack()
if err != nil {