diff options
author | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
commit | ab690ab9c6ae2b00aef1b501e8b17ff02b5da753 (patch) | |
tree | 0a58b043605ef1d9b09e75b207c236aacb1ed55a /plugins/jobs/drivers/beanstalk/consumer.go | |
parent | bd0da830ae345e1ed4a67782bf413673beeba037 (diff) |
Update to go 1.17
Add Stat with tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 38 |
1 files changed, 36 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 7e81e6d9..b4d76d38 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "strconv" "strings" "sync/atomic" "time" @@ -220,8 +221,41 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { return nil } +// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { - return nil, nil + const op = errors.Op("beanstalk_state") + stat, err := j.pool.Stats(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: j.tName, + } + + // set stat, skip errors (replace with 0) + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 + if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { + // this is not an error, ready in terms of beanstalk means reserved in the tube + out.Reserved = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 + if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { + // this is not an error, reserved in beanstalk behaves like an active jobs + out.Active = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 + if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { + out.Delayed = int64(v) + } + + return out, nil } func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { @@ -265,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { |