summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go47
1 files changed, 45 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index eaf99be1..6323148b 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -10,6 +11,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -213,12 +215,49 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
+// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("beanstalk_state")
+ stat, err := j.pool.Stats(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: j.tName,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }
+
+ // set stat, skip errors (replace with 0)
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523
+ if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil {
+ out.Active = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525
+ if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil {
+ // this is not an error, reserved in beanstalk behaves like an active jobs
+ out.Reserved = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528
+ if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil {
+ out.Delayed = int64(v)
+ }
+
+ return out, nil
+}
+
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -260,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -315,3 +354,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func ready(r uint32) bool {
+ return r > 0
+}