summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/priority_queue/binary_heap_test.go2
-rw-r--r--pkg/priority_queue/interface.go2
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go118
-rw-r--r--plugins/jobs/drivers/amqp/item.go50
-rw-r--r--plugins/jobs/drivers/amqp/redial.go3
-rw-r--r--plugins/jobs/drivers/amqp/requeue.go34
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go2
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go2
-rw-r--r--plugins/jobs/drivers/sqs/item.go2
-rw-r--r--plugins/jobs/job/general.go28
-rw-r--r--plugins/jobs/plugin.go12
-rw-r--r--plugins/jobs/protocol.go13
12 files changed, 190 insertions, 78 deletions
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
index 21167051..fa0c5c29 100644
--- a/pkg/priority_queue/binary_heap_test.go
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -20,7 +20,7 @@ func (t Test) Nack() error {
return nil
}
-func (t Test) Requeue(_ uint32) error {
+func (t Test) Requeue(_ int64) error {
return nil
}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
index eee2a090..3d192e8a 100644
--- a/pkg/priority_queue/interface.go
+++ b/pkg/priority_queue/interface.go
@@ -27,5 +27,5 @@ type Item interface {
Nack() error
// Requeue - put the message back to the queue with the optional delay
- Requeue(delay uint32) error
+ Requeue(delay int64) error
}
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 3ca5c742..36a16bcd 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -54,6 +54,7 @@ type JobConsumer struct {
listeners uint32
stopCh chan struct{}
+ requeueCh chan *Item
}
// NewAMQPConsumer initializes rabbitmq pipeline
@@ -111,6 +112,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
exclusive: pipeCfg.Exclusive,
multipleAck: pipeCfg.MultipleAck,
requeueOnFail: pipeCfg.RequeueOnFail,
+ requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -133,8 +135,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
jb.publishChan <- pch
- // run redialer for the connection
+ // run redialer and requeue listener for the connection
jb.redialer()
+ jb.requeueListener()
return jb, nil
}
@@ -181,6 +184,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
exclusive: pipeline.Bool(exclusive, true),
multipleAck: pipeline.Bool(multipleAsk, false),
requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -209,6 +213,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// run redialer for the connection
jb.redialer()
+ jb.requeueListener()
return jb, nil
}
@@ -240,52 +245,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, err)
}
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- // delay cache optimization.
- // If user already declared a queue with a delay, do not redeclare and rebind the queue
- // Before -> 2.5k RPS with redeclaration
- // After -> 30k RPS
- if _, exists := j.delayCache[tmpQ]; exists {
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: p,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- }
-
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
+ err = j.handleItem(msg, p, pch)
+ if err != nil {
+ return errors.E(op, err)
+ }
- if err != nil {
- return errors.E(op, err)
- }
+ return nil
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
- }
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+// handleItem
+func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error {
+ const op = errors.Op("amqp_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+
+ // delay cache optimization.
+ // If user already declared a queue with a delay, do not redeclare and rebind the queue
+ // Before -> 2.5k RPS with redeclaration
+ // After -> 30k RPS
+ if _, exists := j.delayCache[tmpQ]; exists {
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: p,
+ err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
ContentType: contentType,
Timestamp: time.Now().UTC(),
DeliveryMode: amqp.Persistent,
@@ -296,28 +284,56 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, err)
}
- j.delayCache[tmpQ] = struct{}{}
-
return nil
}
+ _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: p,
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
ContentType: contentType,
- Timestamp: time.Now(),
+ Timestamp: time.Now().UTC(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
+
if err != nil {
return errors.E(op, err)
}
+ j.delayCache[tmpQ] = struct{}{}
+
return nil
+ }
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
+ // insert to the local, limited pipeline
+ err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
}
+
+ return nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -475,6 +491,8 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
func (j *JobConsumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
+ close(j.requeueCh)
+
pipe := j.pipeline.Load().(*pipeline.Pipeline)
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index a7e2c4e5..1a7ce00e 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -25,15 +25,6 @@ type Item struct {
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
-
- // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
- AckFunc func(multiply bool) error
-
- // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
- // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
- // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
- // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
- NackFunc func(multiply bool, requeue bool) error
}
// Options carry information about how to handle given job.
@@ -52,6 +43,17 @@ type Options struct {
Timeout int64 `json:"timeout,omitempty"`
// private
+ // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ ack func(multiply bool) error
+
+ // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
+ // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
+ // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
+ nack func(multiply bool, requeue bool) error
+
+ requeueCh chan *Item
+
multipleAsk bool
requeue bool
}
@@ -104,16 +106,23 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- return i.AckFunc(i.Options.multipleAsk)
+ return i.Options.ack(i.Options.multipleAsk)
}
func (i *Item) Nack() error {
- return i.NackFunc(false, i.Options.requeue)
+ return i.Options.nack(false, i.Options.requeue)
}
// Requeue with the provided delay, handled by the Nack
-func (i *Item) Requeue(_ uint32) error {
- return nil
+func (i *Item) Requeue(delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return nil
+ default:
+ return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+ }
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
@@ -123,16 +132,20 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
if err != nil {
return nil, errors.E(op, err)
}
- return &Item{
+
+ i := &Item{
Job: item.Job,
Ident: item.Ident,
Payload: item.Payload,
Headers: item.Headers,
Options: item.Options,
- // internal
- AckFunc: d.Ack,
- NackFunc: d.Nack,
- }, nil
+ }
+
+ item.Options.ack = d.Ack
+ item.Options.nack = d.Nack
+ // requeue channel
+ item.Options.requeueCh = j.requeueCh
+ return i, nil
}
func fromJob(job *job.Job) *Item {
@@ -172,6 +185,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
+ requeueCh: j.requeueCh,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index ef2a130a..8dc18b8f 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -24,7 +24,7 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
j.Lock()
- // trash the broken publish channel
+ // trash the broken publishing channel
<-j.publishChan
t := time.Now()
@@ -85,6 +85,7 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
return errors.E(op, err)
}
+ // put the fresh publishing channel
j.publishChan <- pch
// restart listener
j.listener(deliv)
diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go
new file mode 100644
index 00000000..a2b3b26c
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/requeue.go
@@ -0,0 +1,34 @@
+package amqp
+
+// requeueListener should handle items passed to requeue
+func (j *JobConsumer) requeueListener() {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case item, ok := <-j.requeueCh:
+ if !ok {
+ j.log.Info("requeue channel closed")
+ return
+ }
+
+ pch := <-j.publishChan
+
+ headers, err := pack(item.ID(), item)
+ if err != nil {
+ j.publishChan <- pch
+ j.log.Error("requeue pack", "error", err)
+ continue
+ }
+
+ err = j.handleItem(item, headers, pch)
+ if err != nil {
+ j.publishChan <- pch
+ j.log.Error("requeue handle item", "error", err)
+ continue
+ }
+
+ j.publishChan <- pch
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index eb532e08..7c792b46 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -103,7 +103,7 @@ func (i *Item) Nack() error {
return i.Options.conn.Delete(i.Options.id)
}
-func (i *Item) Requeue(_ uint32) error {
+func (i *Item) Requeue(_ int64) error {
return nil
}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index ebf16524..8560f10a 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -111,6 +111,6 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ uint32) error {
+func (i *Item) Requeue(_ int64) error {
return nil
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index 50d8ac18..a3039d1b 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -156,7 +156,7 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ uint32) error {
+func (i *Item) Requeue(_ int64) error {
return nil
}
diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/general.go
index 2c7d04f0..d2a27373 100644
--- a/plugins/jobs/job/general.go
+++ b/plugins/jobs/job/general.go
@@ -29,3 +29,31 @@ type Job struct {
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
+
+func (j Job) ID() string {
+ panic("implement me")
+}
+
+func (j Job) Priority() int64 {
+ panic("implement me")
+}
+
+func (j Job) Body() []byte {
+ panic("implement me")
+}
+
+func (j Job) Context() ([]byte, error) {
+ panic("implement me")
+}
+
+func (j Job) Ack() error {
+ panic("implement me")
+}
+
+func (j Job) Nack() error {
+ panic("implement me")
+}
+
+func (j Job) Requeue(delay uint32) error {
+ panic("implement me")
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 61936db2..c9bba1c2 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -195,7 +195,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
for {
select {
case <-p.stopCh:
- p.log.Debug("------> job poller stopped <------")
+ p.log.Info("------> job poller stopped <------")
return
default:
// get prioritized JOB from the queue
@@ -240,6 +240,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
+ // if response is nil or body is nil, just acknowledge the job
+ if resp == nil || resp.Body == nil {
+ p.putPayload(exec)
+ err = jb.Ack()
+ if err != nil {
+ p.log.Error("acknowledge error, job might be missed", "error", err)
+ continue
+ }
+ }
+
// handle the response protocol
err = handleResponse(resp.Body, jb, p.log)
if err != nil {
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
index adfd0784..e27f2868 100644
--- a/plugins/jobs/protocol.go
+++ b/plugins/jobs/protocol.go
@@ -25,22 +25,30 @@ type protocol struct {
type errorResp struct {
Msg string `json:"message"`
Requeue bool `json:"requeue"`
- Delay uint32 `json:"delay_seconds"`
+ Delay int64 `json:"delay_seconds"`
}
func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
const op = errors.Op("jobs_handle_response")
// TODO(rustatian) to sync.Pool
p := &protocol{}
+
err := json.Unmarshal(resp, p)
if err != nil {
return errors.E(op, err)
}
switch p.T {
+ // likely case
+ case NoError:
+ err = jb.Ack()
+ if err != nil {
+ return errors.E(op, err)
+ }
case Error:
// TODO(rustatian) to sync.Pool
er := &errorResp{}
+
err = json.Unmarshal(p.Data, er)
if err != nil {
return errors.E(op, err)
@@ -55,8 +63,7 @@ func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
}
return nil
}
-
- case NoError:
+ default:
err = jb.Ack()
if err != nil {
return errors.E(op, err)