summaryrefslogtreecommitdiff
path: root/plugins/jobs/protocol.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
committerValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
commit2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch)
treed796a11941fab4be668843a3fcbd83ea0859db39 /plugins/jobs/protocol.go
parente855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff)
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r--plugins/jobs/protocol.go11
1 files changed, 7 insertions, 4 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
index 691369d0..9d769fdf 100644
--- a/plugins/jobs/protocol.go
+++ b/plugins/jobs/protocol.go
@@ -10,8 +10,8 @@ import (
type Type uint32
const (
- Error Type = iota
- NoError
+ NoError Type = iota
+ Error
)
// internal worker protocol (jobs mode)
@@ -19,7 +19,7 @@ type protocol struct {
// message type, see Type
T Type `json:"type"`
// Payload
- Data []byte `json:"data"`
+ Data json.RawMessage `json:"data"`
}
type errorResp struct {
@@ -55,7 +55,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
return errors.E(op, err)
}
- log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
+ log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
if er.Requeue {
err = jb.Requeue(er.Headers, er.Delay)
@@ -64,6 +64,9 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
}
return nil
}
+
+ return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg))
+
default:
err = jb.Ack()
if err != nil {