summaryrefslogtreecommitdiff
path: root/plugins/beanstalk
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/beanstalk')
-rw-r--r--plugins/beanstalk/config.go53
-rw-r--r--plugins/beanstalk/connection.go223
-rw-r--r--plugins/beanstalk/consumer.go360
-rw-r--r--plugins/beanstalk/encode_test.go75
-rw-r--r--plugins/beanstalk/item.go147
-rw-r--r--plugins/beanstalk/listen.go39
-rw-r--r--plugins/beanstalk/plugin.go47
7 files changed, 944 insertions, 0 deletions
diff --git a/plugins/beanstalk/config.go b/plugins/beanstalk/config.go
new file mode 100644
index 00000000..a8069f5d
--- /dev/null
+++ b/plugins/beanstalk/config.go
@@ -0,0 +1,53 @@
+package beanstalk
+
+import (
+ "time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ tubePriority string = "tube_priority"
+ tube string = "tube"
+ reserveTimeout string = "reserve_timeout"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+ Timeout time.Duration `mapstructure:"timeout"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "tcp://127.0.0.1:11300"
+ }
+
+ if c.Timeout == 0 {
+ c.Timeout = time.Second * 30
+ }
+}
+
+type Config struct {
+ PipePriority int64 `mapstructure:"priority"`
+ TubePriority *uint32 `mapstructure:"tube_priority"`
+ Tube string `mapstructure:"tube"`
+ ReserveTimeout time.Duration `mapstructure:"reserve_timeout"`
+}
+
+func (c *Config) InitDefault() {
+ if c.Tube == "" {
+ c.Tube = "default"
+ }
+
+ if c.ReserveTimeout == 0 {
+ c.ReserveTimeout = time.Second * 1
+ }
+
+ if c.TubePriority == nil {
+ c.TubePriority = utils.Uint32(0)
+ }
+
+ if c.PipePriority == 0 {
+ c.PipePriority = 10
+ }
+}
diff --git a/plugins/beanstalk/connection.go b/plugins/beanstalk/connection.go
new file mode 100644
index 00000000..d3241b37
--- /dev/null
+++ b/plugins/beanstalk/connection.go
@@ -0,0 +1,223 @@
+package beanstalk
+
+import (
+ "context"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type ConnPool struct {
+ sync.RWMutex
+
+ log logger.Logger
+
+ conn *beanstalk.Conn
+ connT *beanstalk.Conn
+ ts *beanstalk.TubeSet
+ t *beanstalk.Tube
+
+ network string
+ address string
+ tName string
+ tout time.Duration
+}
+
+func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) {
+ connT, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ connTS, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ tube := beanstalk.NewTube(connT, tName)
+ ts := beanstalk.NewTubeSet(connTS, tName)
+
+ return &ConnPool{
+ log: log,
+ network: network,
+ address: address,
+ tName: tName,
+ tout: tout,
+ conn: connTS,
+ connT: connT,
+ ts: ts,
+ t: tube,
+ }, nil
+}
+
+// Put the payload
+// TODO use the context ??
+func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ // TODO(rustatian): redial based on the token
+ id, err := cp.t.Put(body, pri, delay, ttr)
+ if err != nil {
+ // errN contains both, err and internal checkAndRedial error
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
+ } else {
+ // retry put only when we redialed
+ return cp.t.Put(body, pri, delay, ttr)
+ }
+ }
+
+ return id, nil
+}
+
+// Reserve reserves and returns a job from one of the tubes in t. If no
+// job is available before time timeout has passed, Reserve returns a
+// ConnError recording ErrTimeout.
+//
+// Typically, a client will reserve a job, perform some work, then delete
+// the job with Conn.Delete.
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ id, body, err := cp.ts.Reserve(reserveTimeout)
+ if err != nil {
+ // errN contains both, err and internal checkAndRedial error
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
+ } else {
+ // retry Reserve only when we redialed
+ return cp.ts.Reserve(reserveTimeout)
+ }
+ }
+
+ return id, body, nil
+}
+
+func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ err := cp.conn.Delete(id)
+ if err != nil {
+ // errN contains both, err and internal checkAndRedial error
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return errors.Errorf("err: %s\nerr redial: %s", err, errN)
+ } else {
+ // retry Delete only when we redialed
+ return cp.conn.Delete(id)
+ }
+ }
+ return nil
+}
+
+func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ stat, err := cp.conn.Stats()
+ if err != nil {
+ errR := cp.checkAndRedial(err)
+ if errR != nil {
+ return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
+ } else {
+ return cp.conn.Stats()
+ }
+ }
+
+ return stat, nil
+}
+
+func (cp *ConnPool) redial() error {
+ const op = errors.Op("connection_pool_redial")
+
+ cp.Lock()
+ // backoff here
+ expb := backoff.NewExponentialBackOff()
+ // TODO(rustatian) set via config
+ expb.MaxElapsedTime = time.Minute
+
+ operation := func() error {
+ connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+ if connT == nil {
+ return errors.E(op, errors.Str("connectionT is nil"))
+ }
+
+ connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+
+ if connTS == nil {
+ return errors.E(op, errors.Str("connectionTS is nil"))
+ }
+
+ cp.t = beanstalk.NewTube(connT, cp.tName)
+ cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
+ cp.conn = connTS
+ cp.connT = connT
+
+ cp.log.Info("beanstalk redial was successful")
+ return nil
+ }
+
+ retryErr := backoff.Retry(operation, expb)
+ if retryErr != nil {
+ cp.Unlock()
+ return retryErr
+ }
+ cp.Unlock()
+
+ return nil
+}
+
+var connErrors = map[string]struct{}{"EOF": {}}
+
+func (cp *ConnPool) checkAndRedial(err error) error {
+ const op = errors.Op("connection_pool_check_redial")
+ switch et := err.(type) { //nolint:gocritic
+ // check if the error
+ case beanstalk.ConnError:
+ switch bErr := et.Err.(type) {
+ case *net.OpError:
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
+ }
+
+ // if redial was successful -> continue listening
+ return nil
+ default:
+ if _, ok := connErrors[et.Err.Error()]; ok {
+ // if error is related to the broken connection - redial
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", err, errR))
+ }
+ // if redial was successful -> continue listening
+ return nil
+ }
+ }
+ }
+
+ // return initial error
+ return err
+}
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
new file mode 100644
index 00000000..5ef89983
--- /dev/null
+++ b/plugins/beanstalk/consumer.go
@@ -0,0 +1,360 @@
+package beanstalk
+
+import (
+ "bytes"
+ "context"
+ "strconv"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type consumer struct {
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+
+ pipeline atomic.Value
+ listeners uint32
+
+ // beanstalk
+ pool *ConnPool
+ addr string
+ network string
+ reserveTimeout time.Duration
+ reconnectCh chan struct{}
+ tout time.Duration
+ // tube name
+ tName string
+ tubePriority *uint32
+ priority int64
+
+ stopCh chan struct{}
+ requeueCh chan *Item
+}
+
+func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("new_beanstalk_consumer")
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // PARSE CONFIGURATION -------
+
+ dsn := strings.Split(globalCfg.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ }
+
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // initialize job consumer
+ jc := &consumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ pool: cPool,
+ network: dsn[0],
+ addr: dsn[1],
+ tout: globalCfg.Timeout,
+ tName: pipeCfg.Tube,
+ reserveTimeout: pipeCfg.ReserveTimeout,
+ tubePriority: pipeCfg.TubePriority,
+ priority: pipeCfg.PipePriority,
+
+ // buffered with two because jobs root plugin can call Stop at the same time as Pause
+ stopCh: make(chan struct{}, 2),
+ requeueCh: make(chan *Item, 1000),
+ reconnectCh: make(chan struct{}, 2),
+ }
+
+ return jc, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("new_beanstalk_consumer")
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // PARSE CONFIGURATION -------
+
+ dsn := strings.Split(globalCfg.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ }
+
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // initialize job consumer
+ jc := &consumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ pool: cPool,
+ network: dsn[0],
+ addr: dsn[1],
+ tout: globalCfg.Timeout,
+ tName: pipe.String(tube, "default"),
+ reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
+ tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))),
+ priority: pipe.Priority(),
+
+ // buffered with two because jobs root plugin can call Stop at the same time as Pause
+ stopCh: make(chan struct{}, 2),
+ requeueCh: make(chan *Item, 1000),
+ reconnectCh: make(chan struct{}, 2),
+ }
+
+ return jc, nil
+}
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
+ const op = errors.Op("beanstalk_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != jb.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
+ }
+
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *consumer) handleItem(ctx context.Context, item *Item) error {
+ const op = errors.Op("beanstalk_handle_item")
+
+ bb := new(bytes.Buffer)
+ bb.Grow(64)
+ err := item.pack(bb)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
+ // <pri> is an integer < 2**32. Jobs with smaller priority values will be
+ // scheduled before jobs with larger priorities. The most urgent priority is 0;
+ // the least urgent priority is 4,294,967,295.
+ //
+ // <delay> is an integer number of seconds to wait before putting the job in
+ // the ready queue. The job will be in the "delayed" state during this time.
+ // Maximum delay is 2**32-1.
+ //
+ // <ttr> -- time to run -- is an integer number of seconds to allow a worker
+ // to run this job. This time is counted from the moment a worker reserves
+ // this job. If the worker does not delete, release, or bury the job within
+ // <ttr> seconds, the job will time out and the server will release the job.
+ // The minimum ttr is 1. If the client sends 0, the server will silently
+ // increase the ttr to 1. Maximum ttr is 2**32-1.
+ id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
+ if err != nil {
+ errD := j.pool.Delete(ctx, id)
+ if errD != nil {
+ return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
+ }
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ // register the pipeline
+ j.pipeline.Store(p)
+ return nil
+}
+
+// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
+func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("beanstalk_state")
+ stat, err := j.pool.Stats(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: j.tName,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }
+
+ // set stat, skip errors (replace with 0)
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523
+ if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil {
+ out.Active = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525
+ if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil {
+ // this is not an error, reserved in beanstalk behaves like an active jobs
+ out.Reserved = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528
+ if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil {
+ out.Delayed = int64(v)
+ }
+
+ return out, nil
+}
+
+func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("beanstalk_run")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
+ }
+
+ atomic.AddUint32(&j.listeners, 1)
+
+ go j.listen()
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (j *consumer) Stop(context.Context) error {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ if atomic.LoadUint32(&j.listeners) == 1 {
+ j.stopCh <- struct{}{}
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (j *consumer) Pause(_ context.Context, p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ j.stopCh <- struct{}{}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (j *consumer) Resume(_ context.Context, p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
+ // start listener
+ go j.listen()
+
+ // increase num of listeners
+ atomic.AddUint32(&j.listeners, 1)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go
new file mode 100644
index 00000000..e43207eb
--- /dev/null
+++ b/plugins/beanstalk/encode_test.go
@@ -0,0 +1,75 @@
+package beanstalk
+
+import (
+ "bytes"
+ "crypto/rand"
+ "encoding/gob"
+ "testing"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+func BenchmarkEncodeGob(b *testing.B) {
+ tb := make([]byte, 1024*10)
+ _, err := rand.Read(tb)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ item := &Item{
+ Job: "/super/test/php/class/loooooong",
+ Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
+ Payload: utils.AsString(tb),
+ Headers: map[string][]string{"Test": {"test1", "test2"}},
+ Options: &Options{
+ Priority: 10,
+ Pipeline: "test-local-pipe",
+ Delay: 10,
+ },
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ bb := new(bytes.Buffer)
+ err := gob.NewEncoder(bb).Encode(item)
+ if err != nil {
+ b.Fatal(err)
+ }
+ _ = bb.Bytes()
+ bb.Reset()
+ }
+}
+
+func BenchmarkEncodeJsonIter(b *testing.B) {
+ tb := make([]byte, 1024*10)
+ _, err := rand.Read(tb)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ item := &Item{
+ Job: "/super/test/php/class/loooooong",
+ Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
+ Payload: utils.AsString(tb),
+ Headers: map[string][]string{"Test": {"test1", "test2"}},
+ Options: &Options{
+ Priority: 10,
+ Pipeline: "test-local-pipe",
+ Delay: 10,
+ },
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ bb, err := json.Marshal(item)
+ if err != nil {
+ b.Fatal(err)
+ }
+ _ = bb
+ }
+}
diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go
new file mode 100644
index 00000000..0a6cd560
--- /dev/null
+++ b/plugins/beanstalk/item.go
@@ -0,0 +1,147 @@
+package beanstalk
+
+import (
+ "bytes"
+ "context"
+ "encoding/gob"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Private ================
+ id uint64
+ conn *beanstalk.Conn
+ requeueFn func(context.Context, *Item) error
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the sqs, MessageAttributes used instead
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ return i.Options.conn.Delete(i.Options.id)
+}
+
+func (i *Item) Nack() error {
+ return i.Options.conn.Delete(i.Options.id)
+}
+
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ i.Headers = headers
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
+ }
+
+ // delete old job
+ err = i.Options.conn.Delete(i.Options.id)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
+
+func (i *Item) pack(b *bytes.Buffer) error {
+ err := gob.NewEncoder(b).Encode(i)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
+ err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
+ if err != nil {
+ return err
+ }
+ out.Options.conn = j.pool.conn
+ out.Options.id = id
+ out.Options.requeueFn = j.handleItem
+
+ return nil
+}
diff --git a/plugins/beanstalk/listen.go b/plugins/beanstalk/listen.go
new file mode 100644
index 00000000..6bb159ea
--- /dev/null
+++ b/plugins/beanstalk/listen.go
@@ -0,0 +1,39 @@
+package beanstalk
+
+import (
+ "github.com/beanstalkd/go-beanstalk"
+)
+
+func (j *consumer) listen() {
+ for {
+ select {
+ case <-j.stopCh:
+ j.log.Warn("beanstalk listener stopped")
+ return
+ default:
+ id, body, err := j.pool.Reserve(j.reserveTimeout)
+ if err != nil {
+ if errB, ok := err.(beanstalk.ConnError); ok {
+ switch errB.Err { //nolint:gocritic
+ case beanstalk.ErrTimeout:
+ j.log.Info("beanstalk reserve timeout", "warn", errB.Op)
+ continue
+ }
+ }
+ // in case of other error - continue
+ j.log.Error("beanstalk reserve", "error", err)
+ continue
+ }
+
+ item := &Item{}
+ err = j.unpack(id, body, item)
+ if err != nil {
+ j.log.Error("beanstalk unpack item", "error", err)
+ continue
+ }
+
+ // insert job into the priority queue
+ j.pq.Insert(item)
+ }
+ }
+}
diff --git a/plugins/beanstalk/plugin.go b/plugins/beanstalk/plugin.go
new file mode 100644
index 00000000..529d1474
--- /dev/null
+++ b/plugins/beanstalk/plugin.go
@@ -0,0 +1,47 @@
+package beanstalk
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "beanstalk"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, eh, pq)
+}