diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 18 |
1 files changed, 14 insertions, 4 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 21b05b16..f41a2c8a 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -48,6 +48,15 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config var pipeCfg Config var globalCfg GlobalCfg + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + err := cfg.UnmarshalKey(configKey, &pipeCfg) if err != nil { return nil, errors.E(op, err) @@ -94,8 +103,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}, 2), } - jc.requeueListener() - return jc, nil } @@ -105,6 +112,11 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu // PARSE CONFIGURATION ------- var globalCfg GlobalCfg + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + err := cfg.UnmarshalKey(pluginName, &globalCfg) if err != nil { return nil, errors.E(op, err) @@ -144,8 +156,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu reconnectCh: make(chan struct{}, 2), } - jc.requeueListener() - return jc, nil } func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { |