diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
commit | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch) | |
tree | d796a11941fab4be668843a3fcbd83ea0859db39 /plugins/jobs/protocol.go | |
parent | e855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff) |
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r-- | plugins/jobs/protocol.go | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go index 691369d0..9d769fdf 100644 --- a/plugins/jobs/protocol.go +++ b/plugins/jobs/protocol.go @@ -10,8 +10,8 @@ import ( type Type uint32 const ( - Error Type = iota - NoError + NoError Type = iota + Error ) // internal worker protocol (jobs mode) @@ -19,7 +19,7 @@ type protocol struct { // message type, see Type T Type `json:"type"` // Payload - Data []byte `json:"data"` + Data json.RawMessage `json:"data"` } type errorResp struct { @@ -55,7 +55,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { return errors.E(op, err) } - log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) + log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) if er.Requeue { err = jb.Requeue(er.Headers, er.Delay) @@ -64,6 +64,9 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { } return nil } + + return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg)) + default: err = jb.Ack() if err != nil { |