diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 38 |
1 files changed, 36 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 7e81e6d9..b4d76d38 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "strconv" "strings" "sync/atomic" "time" @@ -220,8 +221,41 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { return nil } +// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil + const op = errors.Op("beanstalk_state") + stat, err := j.pool.Stats(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: j.tName, + } + + // set stat, skip errors (replace with 0) + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 + if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { + // this is not an error, ready in terms of beanstalk means reserved in the tube + out.Reserved = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 + if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { + // this is not an error, reserved in beanstalk behaves like an active jobs + out.Active = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 + if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { + out.Delayed = int64(v) + } + + return out, nil } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { @@ -265,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { |