summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
committerValery Piashchynski <[email protected]>2021-08-01 19:12:40 +0300
commitc90c11b92e229280477a9b049e65ca1048825dd4 (patch)
tree2a38695cad6dc3095b291575cfb40bc56820d86d /plugins/jobs/drivers/beanstalk/consumer.go
parent1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (diff)
Rework vec based on the channel. Use select statement with the default
branch to handle dead workers inside the channel. Update docker-compose.yaml used for the tests. Update rabbitmq to 3.9.1. Replace third-party amqp091 with the official implementation. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go8
1 files changed, 4 insertions, 4 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 90da3801..54c8318b 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -192,7 +192,7 @@ func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error
return nil
}
-func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -204,7 +204,7 @@ func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
atomic.AddUint32(&j.listeners, 1)
- go j.listen(ctx)
+ go j.listen()
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
@@ -260,7 +260,7 @@ func (j *JobConsumer) Pause(ctx context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, p string) {
+func (j *JobConsumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -276,7 +276,7 @@ func (j *JobConsumer) Resume(ctx context.Context, p string) {
}
// start listener
- go j.listen(ctx)
+ go j.listen()
// increase num of listeners
atomic.AddUint32(&j.listeners, 1)