diff options
author | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-01 19:12:40 +0300 |
commit | c90c11b92e229280477a9b049e65ca1048825dd4 (patch) | |
tree | 2a38695cad6dc3095b291575cfb40bc56820d86d /plugins/jobs/drivers/beanstalk/consumer.go | |
parent | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff) |
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel.
Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1.
Replace third-party amqp091 with the official implementation.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 90da3801..54c8318b 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -192,7 +192,7 @@ func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error return nil } -func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -204,7 +204,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) - go j.listen(ctx) + go j.listen() j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, @@ -260,7 +260,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) { }) } -func (j *JobConsumer) Resume(ctx context.Context, p string) { +func (j *JobConsumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -276,7 +276,7 @@ func (j *JobConsumer) Resume(ctx context.Context, p string) { } // start listener - go j.listen(ctx) + go j.listen() // increase num of listeners atomic.AddUint32(&j.listeners, 1) |