summaryrefslogtreecommitdiff
path: root/plugins/jobs/protocol.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-10 19:54:03 +0300
committerValery Piashchynski <[email protected]>2021-08-10 19:54:03 +0300
commita8a7f4194156440ef3157d8e5d75c43ed0327bcf (patch)
tree9bc4240fb3c6f02682420689490f56d681d4b545 /plugins/jobs/protocol.go
parentd379c28a1e9babead0266bc4fa10d6c5e7aa14cb (diff)
Add jobs protocol support for the AMQP driver
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/protocol.go')
-rw-r--r--plugins/jobs/protocol.go13
1 files changed, 10 insertions, 3 deletions
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
index adfd0784..e27f2868 100644
--- a/plugins/jobs/protocol.go
+++ b/plugins/jobs/protocol.go
@@ -25,22 +25,30 @@ type protocol struct {
type errorResp struct {
Msg string `json:"message"`
Requeue bool `json:"requeue"`
- Delay uint32 `json:"delay_seconds"`
+ Delay int64 `json:"delay_seconds"`
}
func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
const op = errors.Op("jobs_handle_response")
// TODO(rustatian) to sync.Pool
p := &protocol{}
+
err := json.Unmarshal(resp, p)
if err != nil {
return errors.E(op, err)
}
switch p.T {
+ // likely case
+ case NoError:
+ err = jb.Ack()
+ if err != nil {
+ return errors.E(op, err)
+ }
case Error:
// TODO(rustatian) to sync.Pool
er := &errorResp{}
+
err = json.Unmarshal(p.Data, er)
if err != nil {
return errors.E(op, err)
@@ -55,8 +63,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
}
return nil
}
-
- case NoError:
+ default:
err = jb.Ack()
if err != nil {
return errors.E(op, err)