diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 00:45:13 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 00:45:13 +0300 |
commit | e4b955135692bc2bc0fd712b6779d83d32fbd17c (patch) | |
tree | b3fe2d03ca51d89f034fef9ce1e4148d4acb8b5a /plugins/jobs/drivers/beanstalk/consumer.go | |
parent | b2da831f47284974551710d2767a7bdde0efa51d (diff) |
Update beanstalk redial algo.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 10 |
1 files changed, 2 insertions, 8 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index cce85c99..1c2e9781 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -66,7 +66,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } - cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout) + cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log) if err != nil { return nil, errors.E(op, err) } @@ -90,9 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}), } - // start redial listener - go jc.redial() - return jc, nil } @@ -121,7 +118,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } - cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout) + cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log) if err != nil { return nil, errors.E(op, err) } @@ -145,9 +142,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu reconnectCh: make(chan struct{}, 2), } - // start redial listener - go jc.redial() - return jc, nil } func (j *JobConsumer) Push(jb *job.Job) error { |