summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go4
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go22
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go30
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/requeue.go24
5 files changed, 69 insertions, 13 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index ae223f39..32ca4188 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -61,6 +61,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t
cp.RLock()
defer cp.RUnlock()
+ // TODO(rustatian): redial based on the token
id, err := cp.t.Put(body, pri, delay, ttr)
if err != nil {
// errN contains both, err and internal checkAndRedial error
@@ -82,7 +83,6 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t
//
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
-
func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
cp.RLock()
defer cp.RUnlock()
@@ -126,7 +126,7 @@ func (cp *ConnPool) redial() error {
cp.Lock()
// backoff here
expb := backoff.NewExponentialBackOff()
- // TODO set via config
+ // TODO(rustatian) set via config
expb.MaxElapsedTime = time.Minute
operation := func() error {
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 54c8318b..b57b22ac 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -36,7 +36,8 @@ type JobConsumer struct {
tubePriority uint32
priority int64
- stopCh chan struct{}
+ stopCh chan struct{}
+ requeueCh chan *Item
}
func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -88,9 +89,12 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
// buffered with two because jobs root plugin can call Stop at the same time as Pause
stopCh: make(chan struct{}, 2),
- reconnectCh: make(chan struct{}),
+ requeueCh: make(chan *Item, 1000),
+ reconnectCh: make(chan struct{}, 2),
}
+ jc.requeueListener()
+
return jc, nil
}
@@ -135,9 +139,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
// buffered with two because jobs root plugin can call Stop at the same time as Pause
stopCh: make(chan struct{}, 2),
+ requeueCh: make(chan *Item, 1000),
reconnectCh: make(chan struct{}, 2),
}
+ jc.requeueListener()
+
return jc, nil
}
func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
@@ -150,7 +157,16 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
- item := fromJob(jb)
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
+ const op = errors.Op("beanstalk_handle_item")
bb := new(bytes.Buffer)
bb.Grow(64)
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 7c792b46..91dbf41c 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -7,6 +7,7 @@ import (
"github.com/beanstalkd/go-beanstalk"
json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -40,12 +41,19 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
- // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ // Reserve defines for how broker should wait until treating job are failed.
+ // - <ttr> -- time to run -- is an integer number of seconds to allow a worker
+ // to run this job. This time is counted from the moment a worker reserves
+ // this job. If the worker does not delete, release, or bury the job within
+ // <ttr> seconds, the job will time out and the server will release the job.
+ // The minimum ttr is 1. If the client sends 0, the server will silently
+ // increase the ttr to 1. Maximum ttr is 2**32-1.
Timeout int64 `json:"timeout,omitempty"`
// Private ================
- id uint64
- conn *beanstalk.Conn
+ id uint64
+ conn *beanstalk.Conn
+ requeueCh chan *Item
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -103,8 +111,15 @@ func (i *Item) Nack() error {
return i.Options.conn.Delete(i.Options.id)
}
-func (i *Item) Requeue(_ int64) error {
- return nil
+func (i *Item) Requeue(delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return nil
+ default:
+ return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+ }
}
func fromJob(job *job.Job) *Item {
@@ -131,13 +146,14 @@ func (i *Item) pack(b *bytes.Buffer) error {
return nil
}
-func unpack(id uint64, data []byte, conn *beanstalk.Conn, out *Item) error {
+func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
return err
}
- out.Options.conn = conn
+ out.Options.conn = j.pool.conn
out.Options.id = id
+ out.Options.requeueCh = j.requeueCh
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index aaf635b1..f1385e70 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -26,7 +26,7 @@ func (j *JobConsumer) listen() {
}
item := &Item{}
- err = unpack(id, body, j.pool.conn, item)
+ err = j.unpack(id, body, item)
if err != nil {
j.log.Error("beanstalk unpack item", "error", err)
continue
diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go
new file mode 100644
index 00000000..21053940
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/requeue.go
@@ -0,0 +1,24 @@
+package beanstalk
+
+import "context"
+
+// requeueListener should handle items passed to requeue
+func (j *JobConsumer) requeueListener() {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case item, ok := <-j.requeueCh:
+ if !ok {
+ j.log.Info("requeue channel closed")
+ return
+ }
+
+ err := j.handleItem(context.TODO(), item)
+ if err != nil {
+ j.log.Error("requeue handle item", "error", err)
+ continue
+ }
+ }
+ }
+ }()
+}