diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 429953e1..b89cdc82 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -361,6 +362,29 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } +func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("amqp_driver_state") + select { + case pch := <-j.publishChan: + defer func() { + j.publishChan <- pch + }() + + q, err := pch.QueueInspect(j.queue) + if err != nil { + return nil, errors.E(op, err) + } + + return &jobState.State{ + Queue: q.Name, + Active: int64(q.Messages), + }, nil + + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } +} + func (j *JobConsumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { |