diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
commit | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch) | |
tree | d796a11941fab4be668843a3fcbd83ea0859db39 /plugins/jobs/drivers/sqs/consumer.go | |
parent | e855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff) |
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 9 |
1 files changed, 1 insertions, 8 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 8d93b12c..5d741358 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -50,8 +50,7 @@ type JobConsumer struct { client *sqs.Client queueURL *string - requeueCh chan *Item - pauseCh chan struct{} + pauseCh chan struct{} } func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -103,7 +102,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -138,8 +136,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) - jb.requeueListener() - return jb, nil } @@ -205,7 +201,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -240,8 +235,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) - jb.requeueListener() - return jb, nil } |