summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 00:45:13 +0300
committerValery Piashchynski <[email protected]>2021-07-22 00:45:13 +0300
commite4b955135692bc2bc0fd712b6779d83d32fbd17c (patch)
treeb3fe2d03ca51d89f034fef9ce1e4148d4acb8b5a /plugins/jobs/drivers/beanstalk/consumer.go
parentb2da831f47284974551710d2767a7bdde0efa51d (diff)
Update beanstalk redial algo.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go10
1 files changed, 2 insertions, 8 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index cce85c99..1c2e9781 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -66,7 +66,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
- cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout)
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log)
if err != nil {
return nil, errors.E(op, err)
}
@@ -90,9 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}),
}
- // start redial listener
- go jc.redial()
-
return jc, nil
}
@@ -121,7 +118,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
- cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout)
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log)
if err != nil {
return nil, errors.E(op, err)
}
@@ -145,9 +142,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
reconnectCh: make(chan struct{}, 2),
}
- // start redial listener
- go jc.redial()
-
return jc, nil
}
func (j *JobConsumer) Push(jb *job.Job) error {