diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
commit | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch) | |
tree | d796a11941fab4be668843a3fcbd83ea0859db39 /plugins/jobs/drivers/beanstalk | |
parent | e855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff) |
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 18 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 27 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/requeue.go | 24 |
3 files changed, 33 insertions, 36 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 21b05b16..f41a2c8a 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -48,6 +48,15 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config var pipeCfg Config var globalCfg GlobalCfg + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + err := cfg.UnmarshalKey(configKey, &pipeCfg) if err != nil { return nil, errors.E(op, err) @@ -94,8 +103,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}, 2), } - jc.requeueListener() - return jc, nil } @@ -105,6 +112,11 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu // PARSE CONFIGURATION ------- var globalCfg GlobalCfg + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + err := cfg.UnmarshalKey(pluginName, &globalCfg) if err != nil { return nil, errors.E(op, err) @@ -144,8 +156,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu reconnectCh: make(chan struct{}, 2), } - jc.requeueListener() - return jc, nil } func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index a5aa1791..47336b43 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -2,12 +2,12 @@ package beanstalk import ( "bytes" + "context" "encoding/gob" "time" "github.com/beanstalkd/go-beanstalk" json "github.com/json-iterator/go" - "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" ) @@ -53,7 +53,7 @@ type Options struct { // Private ================ id uint64 conn *beanstalk.Conn - requeueCh chan *Item + requeueFn func(context.Context, *Item) error } // DelayDuration returns delay duration in a form of time.Duration. @@ -115,12 +115,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err } + + // delete old job + err = i.Options.conn.Delete(i.Options.id) + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } func fromJob(job *job.Job) *Item { @@ -154,7 +165,7 @@ func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { } out.Options.conn = j.pool.conn out.Options.id = id - out.Options.requeueCh = j.requeueCh + out.Options.requeueFn = j.handleItem return nil } diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go deleted file mode 100644 index 21053940..00000000 --- a/plugins/jobs/drivers/beanstalk/requeue.go +++ /dev/null @@ -1,24 +0,0 @@ -package beanstalk - -import "context" - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - err := j.handleItem(context.TODO(), item) - if err != nil { - j.log.Error("requeue handle item", "error", err) - continue - } - } - } - }() -} |