From 2d460062c97f9ad1e793831c54ae4d177dea83e8 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 11 Aug 2021 22:03:34 +0300 Subject: Durable requeue algo. Update AMQP and Beanstalk tests to use mock logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski --- plugins/jobs/protocol.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'plugins/jobs/protocol.go') diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go index 691369d0..9d769fdf 100644 --- a/plugins/jobs/protocol.go +++ b/plugins/jobs/protocol.go @@ -10,8 +10,8 @@ import ( type Type uint32 const ( - Error Type = iota - NoError + NoError Type = iota + Error ) // internal worker protocol (jobs mode) @@ -19,7 +19,7 @@ type protocol struct { // message type, see Type T Type `json:"type"` // Payload - Data []byte `json:"data"` + Data json.RawMessage `json:"data"` } type errorResp struct { @@ -55,7 +55,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { return errors.E(op, err) } - log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) + log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) if er.Requeue { err = jb.Requeue(er.Headers, er.Delay) @@ -64,6 +64,9 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { } return nil } + + return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg)) + default: err = jb.Ack() if err != nil { -- cgit v1.2.3