diff options
author | Valery Piashchynski <[email protected]> | 2021-08-10 19:54:03 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-10 19:54:03 +0300 |
commit | a8a7f4194156440ef3157d8e5d75c43ed0327bcf (patch) | |
tree | 9bc4240fb3c6f02682420689490f56d681d4b545 /plugins/jobs/protocol.go | |
parent | d379c28a1e9babead0266bc4fa10d6c5e7aa14cb (diff) |
Add jobs protocol support for the AMQP driver
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r-- | plugins/jobs/protocol.go | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go index adfd0784..e27f2868 100644 --- a/plugins/jobs/protocol.go +++ b/plugins/jobs/protocol.go @@ -25,22 +25,30 @@ type protocol struct { type errorResp struct { Msg string `json:"message"` Requeue bool `json:"requeue"` - Delay uint32 `json:"delay_seconds"` + Delay int64 `json:"delay_seconds"` } func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { const op = errors.Op("jobs_handle_response") // TODO(rustatian) to sync.Pool p := &protocol{} + err := json.Unmarshal(resp, p) if err != nil { return errors.E(op, err) } switch p.T { + // likely case + case NoError: + err = jb.Ack() + if err != nil { + return errors.E(op, err) + } case Error: // TODO(rustatian) to sync.Pool er := &errorResp{} + err = json.Unmarshal(p.Data, er) if err != nil { return errors.E(op, err) @@ -55,8 +63,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { } return nil } - - case NoError: + default: err = jb.Ack() if err != nil { return errors.E(op, err) |