diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 41 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 4 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 31 | ||||
-rw-r--r-- | plugins/jobs/protocol.go | 67 |
6 files changed, 127 insertions, 44 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 6b544620..a7e2c4e5 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -70,22 +70,22 @@ func (o *Options) TimeoutDuration() time.Duration { return time.Second * time.Duration(o.Timeout) } -func (j *Item) ID() string { - return j.Ident +func (i *Item) ID() string { + return i.Ident } -func (j *Item) Priority() int64 { - return j.Options.Priority +func (i *Item) Priority() int64 { + return i.Options.Priority } // Body packs job payload into binary payload. -func (j *Item) Body() []byte { - return utils.AsBytes(j.Payload) +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) } // Context packs job context (job, id) into binary payload. // Not used in the amqp, amqp.Table used instead -func (j *Item) Context() ([]byte, error) { +func (i *Item) Context() ([]byte, error) { ctx, err := json.Marshal( struct { ID string `json:"id"` @@ -93,7 +93,7 @@ func (j *Item) Context() ([]byte, error) { Headers map[string][]string `json:"headers"` Timeout int64 `json:"timeout"` Pipeline string `json:"pipeline"` - }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline}, ) if err != nil { @@ -103,14 +103,20 @@ func (j *Item) Context() ([]byte, error) { return ctx, nil } -func (j *Item) Ack() error { - return j.AckFunc(j.Options.multipleAsk) +func (i *Item) Ack() error { + return i.AckFunc(i.Options.multipleAsk) } -func (j *Item) Nack() error { - return j.NackFunc(false, j.Options.requeue) +func (i *Item) Nack() error { + return i.NackFunc(false, i.Options.requeue) } +// Requeue with the provided delay, handled by the Nack +func (i *Item) Requeue(_ uint32) error { + return nil +} + +// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") item, err := j.unpack(d) @@ -118,11 +124,12 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { return nil, errors.E(op, err) } return &Item{ - Job: item.Job, - Ident: item.Ident, - Payload: item.Payload, - Headers: item.Headers, - Options: item.Options, + Job: item.Job, + Ident: item.Ident, + Payload: item.Payload, + Headers: item.Headers, + Options: item.Options, + // internal AckFunc: d.Ack, NackFunc: d.Nack, }, nil diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index fc0ee86b..eb532e08 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -103,6 +103,10 @@ func (i *Item) Nack() error { return i.Options.conn.Delete(i.Options.id) } +func (i *Item) Requeue(_ uint32) error { + return nil +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 442533c5..ebf16524 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -69,21 +69,21 @@ func (o *Options) TimeoutDuration() time.Duration { return time.Second * time.Duration(o.Timeout) } -func (j *Item) ID() string { - return j.Ident +func (i *Item) ID() string { + return i.Ident } -func (j *Item) Priority() int64 { - return j.Options.Priority +func (i *Item) Priority() int64 { + return i.Options.Priority } // Body packs job payload into binary payload. -func (j *Item) Body() []byte { - return utils.AsBytes(j.Payload) +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) } // Context packs job context (job, id) into binary payload. -func (j *Item) Context() ([]byte, error) { +func (i *Item) Context() ([]byte, error) { ctx, err := json.Marshal( struct { ID string `json:"id"` @@ -91,7 +91,7 @@ func (j *Item) Context() ([]byte, error) { Headers map[string][]string `json:"headers"` Timeout int64 `json:"timeout"` Pipeline string `json:"pipeline"` - }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline}, ) if err != nil { @@ -101,12 +101,16 @@ func (j *Item) Context() ([]byte, error) { return ctx, nil } -func (j *Item) Ack() error { +func (i *Item) Ack() error { // noop for the in-memory return nil } -func (j *Item) Nack() error { +func (i *Item) Nack() error { // noop for the in-memory return nil } + +func (i *Item) Requeue(_ uint32) error { + return nil +} diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index 325c4781..50d8ac18 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -156,6 +156,10 @@ func (i *Item) Nack() error { return nil } +func (i *Item) Requeue(_ uint32) error { + return nil +} + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 6c848a9d..61936db2 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -18,7 +18,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -81,6 +80,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) p.stopCh = make(chan struct{}, 1) + p.pldPool = sync.Pool{New: func() interface{} { // with nil fields return &payload.Payload{} @@ -104,8 +104,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } -func (p *Plugin) getPayload() *payload.Payload { - return p.pldPool.Get().(*payload.Payload) +func (p *Plugin) getPayload(body, context []byte) *payload.Payload { + pld := p.pldPool.Get().(*payload.Payload) + pld.Body = body + pld.Context = context + return pld } func (p *Plugin) putPayload(pld *payload.Payload) { @@ -219,13 +222,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } // get payload from the sync.Pool - exec := p.getPayload() - exec.Body = jb.Body() - exec.Context = ctx - - // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- - // remove in tests - p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context)) + exec := p.getPayload(jb.Body(), ctx) // protect from the pool reset p.RLock() @@ -243,14 +240,14 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } - // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- - // remove in tests - p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context)) - - errAck := jb.Ack() - if errAck != nil { - p.log.Error("acknowledge failed", "error", errAck) + // handle the response protocol + err = handleResponse(resp.Body, jb, p.log) + if err != nil { p.putPayload(exec) + errNack := jb.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed, job might be lost", "error", errNack) + } continue } diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go new file mode 100644 index 00000000..adfd0784 --- /dev/null +++ b/plugins/jobs/protocol.go @@ -0,0 +1,67 @@ +package jobs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/errors" + pq "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type Type uint32 + +const ( + Error Type = iota + NoError +) + +// internal worker protocol (jobs mode) +type protocol struct { + // message type, see Type + T Type `json:"type"` + // Payload + Data []byte `json:"data"` +} + +type errorResp struct { + Msg string `json:"message"` + Requeue bool `json:"requeue"` + Delay uint32 `json:"delay_seconds"` +} + +func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error { + const op = errors.Op("jobs_handle_response") + // TODO(rustatian) to sync.Pool + p := &protocol{} + err := json.Unmarshal(resp, p) + if err != nil { + return errors.E(op, err) + } + + switch p.T { + case Error: + // TODO(rustatian) to sync.Pool + er := &errorResp{} + err = json.Unmarshal(p.Data, er) + if err != nil { + return errors.E(op, err) + } + + log.Error("error protocol type", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue) + + if er.Requeue { + err = jb.Requeue(er.Delay) + if err != nil { + return errors.E(op, err) + } + return nil + } + + case NoError: + err = jb.Ack() + if err != nil { + return errors.E(op, err) + } + } + + return nil +} |