diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 17:26:42 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-18 17:26:42 +0300 |
commit | 324407b3e2d779143be65872993c4d091abb1d38 (patch) | |
tree | e6f0bd64241ab2d4dc05809128c8e8d7d74cbcc4 /plugins/jobs/drivers/amqp | |
parent | a5435be8ab58bd23f1c2d3afd4484dd1d86b6002 (diff) | |
parent | eb70b89cb2f23ccd44b91bbcac7438a05a40c801 (diff) |
#764: feat(stat): `job` plugin drivers statistic
#764: feat(stat): `job` plugin drivers statistic
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 194 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 15 |
2 files changed, 128 insertions, 81 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 429953e1..95df02ec 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -12,10 +12,12 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type JobConsumer struct { @@ -50,9 +52,8 @@ type JobConsumer struct { multipleAck bool requeueOnFail bool - delayCache map[string]struct{} - listeners uint32 + delayed *int64 stopCh chan struct{} } @@ -99,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh: make(chan struct{}), // TODO to config retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), priority: pipeCfg.Priority, + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeCfg.RoutingKey, @@ -169,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con consumeID: uuid.NewString(), stopCh: make(chan struct{}), retryTimeout: time.Minute * 5, - delayCache: make(map[string]struct{}, 100), + delayed: utils.Int64(0), publishChan: make(chan *amqp.Channel, 1), routingKey: pipeline.String(routingKey, ""), @@ -231,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return nil } -// handleItem -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - const op = errors.Op("rabbitmq_handle_item") - select { - case pch := <-j.publishChan: - // return the channel back - defer func() { - j.publishChan <- pch - }() - - // convert - table, err := pack(msg.ID(), msg) - if err != nil { - return errors.E(op, err) - } - - const op = errors.Op("amqp_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - // TODO dlx cache channel?? - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - return errors.E(op, err) - } - - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now().UTC(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - j.delayCache[tmpQ] = struct{}{} - - return nil - } - - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} - func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { j.pipeline.Store(p) return nil @@ -361,6 +287,35 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("amqp_driver_state") + select { + case pch := <-j.publishChan: + defer func() { + j.publishChan <- pch + }() + + q, err := pch.QueueInspect(j.queue) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: q.Name, + Active: int64(q.Messages), + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), + }, nil + + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } +} + func (j *JobConsumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -470,3 +425,84 @@ func (j *JobConsumer) Stop(context.Context) error { }) return nil } + +// handleItem +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") + select { + case pch := <-j.publishChan: + // return the channel back + defer func() { + j.publishChan <- pch + }() + + // convert + table, err := pack(msg.ID(), msg) + if err != nil { + return errors.E(op, err) + } + + const op = errors.Op("rabbitmq_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + atomic.AddInt64(j.delayed, 1) + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now().UTC(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + atomic.AddInt64(j.delayed, ^int64(0)) + return errors.E(op, err) + } + + return nil + } + + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) + } +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 5990d137..623dcca7 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -3,6 +3,7 @@ package amqp import ( "context" "fmt" + "sync/atomic" "time" json "github.com/json-iterator/go" @@ -52,8 +53,8 @@ type Options struct { nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error - + requeueFn func(context.Context, *Item) error + delayed *int64 multipleAsk bool requeue bool } @@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.ack(i.Options.multipleAsk) } func (i *Item) Nack() error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } return i.Options.nack(false, i.Options.requeue) } // Requeue with the provided delay, handled by the Nack func (i *Item) Requeue(headers map[string][]string, delay int64) error { + if i.Options.Delay > 0 { + atomic.AddInt64(i.Options.delayed, ^int64(0)) + } // overwrite the delay i.Options.Delay = delay i.Headers = headers @@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack + item.Options.delayed = j.delayed // requeue func item.Options.requeueFn = j.handleItem |