summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
committerValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
commit2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch)
treed796a11941fab4be668843a3fcbd83ea0859db39 /plugins/jobs/drivers/beanstalk
parente855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff)
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go18
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go27
-rw-r--r--plugins/jobs/drivers/beanstalk/requeue.go24
3 files changed, 33 insertions, 36 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 21b05b16..f41a2c8a 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -48,6 +48,15 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
var pipeCfg Config
var globalCfg GlobalCfg
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
err := cfg.UnmarshalKey(configKey, &pipeCfg)
if err != nil {
return nil, errors.E(op, err)
@@ -94,8 +103,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}, 2),
}
- jc.requeueListener()
-
return jc, nil
}
@@ -105,6 +112,11 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
// PARSE CONFIGURATION -------
var globalCfg GlobalCfg
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
err := cfg.UnmarshalKey(pluginName, &globalCfg)
if err != nil {
return nil, errors.E(op, err)
@@ -144,8 +156,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
reconnectCh: make(chan struct{}, 2),
}
- jc.requeueListener()
-
return jc, nil
}
func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index a5aa1791..47336b43 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -2,12 +2,12 @@ package beanstalk
import (
"bytes"
+ "context"
"encoding/gob"
"time"
"github.com/beanstalkd/go-beanstalk"
json "github.com/json-iterator/go"
- "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -53,7 +53,7 @@ type Options struct {
// Private ================
id uint64
conn *beanstalk.Conn
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -115,12 +115,23 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
}
+
+ // delete old job
+ err = i.Options.conn.Delete(i.Options.id)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
func fromJob(job *job.Job) *Item {
@@ -154,7 +165,7 @@ func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
}
out.Options.conn = j.pool.conn
out.Options.id = id
- out.Options.requeueCh = j.requeueCh
+ out.Options.requeueFn = j.handleItem
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go
deleted file mode 100644
index 21053940..00000000
--- a/plugins/jobs/drivers/beanstalk/requeue.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package beanstalk
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}