summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-18 17:26:42 +0300
committerGitHub <[email protected]>2021-08-18 17:26:42 +0300
commit324407b3e2d779143be65872993c4d091abb1d38 (patch)
treee6f0bd64241ab2d4dc05809128c8e8d7d74cbcc4 /plugins/jobs/drivers/amqp/consumer.go
parenta5435be8ab58bd23f1c2d3afd4484dd1d86b6002 (diff)
parenteb70b89cb2f23ccd44b91bbcac7438a05a40c801 (diff)
#764: feat(stat): `job` plugin drivers statistic
#764: feat(stat): `job` plugin drivers statistic
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go194
1 files changed, 115 insertions, 79 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 429953e1..95df02ec 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -12,10 +12,12 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobConsumer struct {
@@ -50,9 +52,8 @@ type JobConsumer struct {
multipleAck bool
requeueOnFail bool
- delayCache map[string]struct{}
-
listeners uint32
+ delayed *int64
stopCh chan struct{}
}
@@ -99,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
stopCh: make(chan struct{}),
// TODO to config
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
priority: pipeCfg.Priority,
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeCfg.RoutingKey,
@@ -169,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
consumeID: uuid.NewString(),
stopCh: make(chan struct{}),
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeline.String(routingKey, ""),
@@ -231,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("rabbitmq_handle_item")
- select {
- case pch := <-j.publishChan:
- // return the channel back
- defer func() {
- j.publishChan <- pch
- }()
-
- // convert
- table, err := pack(msg.ID(), msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- const op = errors.Op("amqp_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- // TODO dlx cache channel??
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- return errors.E(op, err)
- }
-
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- j.delayCache[tmpQ] = struct{}{}
-
- return nil
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
@@ -361,6 +287,35 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("amqp_driver_state")
+ select {
+ case pch := <-j.publishChan:
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ q, err := pch.QueueInspect(j.queue)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: q.Name,
+ Active: int64(q.Messages),
+ Delayed: atomic.LoadInt64(j.delayed),
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }, nil
+
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
func (j *JobConsumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -470,3 +425,84 @@ func (j *JobConsumer) Stop(context.Context) error {
})
return nil
}
+
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
+ select {
+ case pch := <-j.publishChan:
+ // return the channel back
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ // convert
+ table, err := pack(msg.ID(), msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ const op = errors.Op("rabbitmq_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ atomic.AddInt64(j.delayed, 1)
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now().UTC(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}