summaryrefslogtreecommitdiff
path: root/plugins/beanstalk
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/beanstalk')
-rw-r--r--plugins/beanstalk/config.go53
-rw-r--r--plugins/beanstalk/connection.go223
-rw-r--r--plugins/beanstalk/consumer.go374
-rw-r--r--plugins/beanstalk/encode_test.go75
-rw-r--r--plugins/beanstalk/item.go138
-rw-r--r--plugins/beanstalk/listen.go39
-rw-r--r--plugins/beanstalk/plugin.go47
7 files changed, 0 insertions, 949 deletions
diff --git a/plugins/beanstalk/config.go b/plugins/beanstalk/config.go
deleted file mode 100644
index a8069f5d..00000000
--- a/plugins/beanstalk/config.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package beanstalk
-
-import (
- "time"
-
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- tubePriority string = "tube_priority"
- tube string = "tube"
- reserveTimeout string = "reserve_timeout"
-)
-
-type GlobalCfg struct {
- Addr string `mapstructure:"addr"`
- Timeout time.Duration `mapstructure:"timeout"`
-}
-
-func (c *GlobalCfg) InitDefault() {
- if c.Addr == "" {
- c.Addr = "tcp://127.0.0.1:11300"
- }
-
- if c.Timeout == 0 {
- c.Timeout = time.Second * 30
- }
-}
-
-type Config struct {
- PipePriority int64 `mapstructure:"priority"`
- TubePriority *uint32 `mapstructure:"tube_priority"`
- Tube string `mapstructure:"tube"`
- ReserveTimeout time.Duration `mapstructure:"reserve_timeout"`
-}
-
-func (c *Config) InitDefault() {
- if c.Tube == "" {
- c.Tube = "default"
- }
-
- if c.ReserveTimeout == 0 {
- c.ReserveTimeout = time.Second * 1
- }
-
- if c.TubePriority == nil {
- c.TubePriority = utils.Uint32(0)
- }
-
- if c.PipePriority == 0 {
- c.PipePriority = 10
- }
-}
diff --git a/plugins/beanstalk/connection.go b/plugins/beanstalk/connection.go
deleted file mode 100644
index d3241b37..00000000
--- a/plugins/beanstalk/connection.go
+++ /dev/null
@@ -1,223 +0,0 @@
-package beanstalk
-
-import (
- "context"
- "net"
- "sync"
- "time"
-
- "github.com/beanstalkd/go-beanstalk"
- "github.com/cenkalti/backoff/v4"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type ConnPool struct {
- sync.RWMutex
-
- log logger.Logger
-
- conn *beanstalk.Conn
- connT *beanstalk.Conn
- ts *beanstalk.TubeSet
- t *beanstalk.Tube
-
- network string
- address string
- tName string
- tout time.Duration
-}
-
-func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) {
- connT, err := beanstalk.DialTimeout(network, address, tout)
- if err != nil {
- return nil, err
- }
-
- connTS, err := beanstalk.DialTimeout(network, address, tout)
- if err != nil {
- return nil, err
- }
-
- tube := beanstalk.NewTube(connT, tName)
- ts := beanstalk.NewTubeSet(connTS, tName)
-
- return &ConnPool{
- log: log,
- network: network,
- address: address,
- tName: tName,
- tout: tout,
- conn: connTS,
- connT: connT,
- ts: ts,
- t: tube,
- }, nil
-}
-
-// Put the payload
-// TODO use the context ??
-func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- // TODO(rustatian): redial based on the token
- id, err := cp.t.Put(body, pri, delay, ttr)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry put only when we redialed
- return cp.t.Put(body, pri, delay, ttr)
- }
- }
-
- return id, nil
-}
-
-// Reserve reserves and returns a job from one of the tubes in t. If no
-// job is available before time timeout has passed, Reserve returns a
-// ConnError recording ErrTimeout.
-//
-// Typically, a client will reserve a job, perform some work, then delete
-// the job with Conn.Delete.
-func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- id, body, err := cp.ts.Reserve(reserveTimeout)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry Reserve only when we redialed
- return cp.ts.Reserve(reserveTimeout)
- }
- }
-
- return id, body, nil
-}
-
-func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
- cp.RLock()
- defer cp.RUnlock()
-
- err := cp.conn.Delete(id)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry Delete only when we redialed
- return cp.conn.Delete(id)
- }
- }
- return nil
-}
-
-func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- stat, err := cp.conn.Stats()
- if err != nil {
- errR := cp.checkAndRedial(err)
- if errR != nil {
- return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
- } else {
- return cp.conn.Stats()
- }
- }
-
- return stat, nil
-}
-
-func (cp *ConnPool) redial() error {
- const op = errors.Op("connection_pool_redial")
-
- cp.Lock()
- // backoff here
- expb := backoff.NewExponentialBackOff()
- // TODO(rustatian) set via config
- expb.MaxElapsedTime = time.Minute
-
- operation := func() error {
- connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
- if err != nil {
- return err
- }
- if connT == nil {
- return errors.E(op, errors.Str("connectionT is nil"))
- }
-
- connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
- if err != nil {
- return err
- }
-
- if connTS == nil {
- return errors.E(op, errors.Str("connectionTS is nil"))
- }
-
- cp.t = beanstalk.NewTube(connT, cp.tName)
- cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
- cp.conn = connTS
- cp.connT = connT
-
- cp.log.Info("beanstalk redial was successful")
- return nil
- }
-
- retryErr := backoff.Retry(operation, expb)
- if retryErr != nil {
- cp.Unlock()
- return retryErr
- }
- cp.Unlock()
-
- return nil
-}
-
-var connErrors = map[string]struct{}{"EOF": {}}
-
-func (cp *ConnPool) checkAndRedial(err error) error {
- const op = errors.Op("connection_pool_check_redial")
- switch et := err.(type) { //nolint:gocritic
- // check if the error
- case beanstalk.ConnError:
- switch bErr := et.Err.(type) {
- case *net.OpError:
- cp.RUnlock()
- errR := cp.redial()
- cp.RLock()
- // if redial failed - return
- if errR != nil {
- return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
- }
-
- // if redial was successful -> continue listening
- return nil
- default:
- if _, ok := connErrors[et.Err.Error()]; ok {
- // if error is related to the broken connection - redial
- cp.RUnlock()
- errR := cp.redial()
- cp.RLock()
- // if redial failed - return
- if errR != nil {
- return errors.E(op, errors.Errorf("%v:%v", err, errR))
- }
- // if redial was successful -> continue listening
- return nil
- }
- }
- }
-
- // return initial error
- return err
-}
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
deleted file mode 100644
index 30807f03..00000000
--- a/plugins/beanstalk/consumer.go
+++ /dev/null
@@ -1,374 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "context"
- "encoding/gob"
- "strconv"
- "strings"
- "sync/atomic"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type consumer struct {
- log logger.Logger
- eh events.Handler
- pq priorityqueue.Queue
-
- pipeline atomic.Value
- listeners uint32
-
- // beanstalk
- pool *ConnPool
- addr string
- network string
- reserveTimeout time.Duration
- reconnectCh chan struct{}
- tout time.Duration
- // tube name
- tName string
- tubePriority *uint32
- priority int64
-
- stopCh chan struct{}
- requeueCh chan *Item
-}
-
-func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- const op = errors.Op("new_beanstalk_consumer")
-
- // PARSE CONFIGURATION -------
- var pipeCfg Config
- var globalCfg GlobalCfg
-
- if !cfg.Has(configKey) {
- return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
- }
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
- }
-
- err := cfg.UnmarshalKey(configKey, &pipeCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipeCfg.InitDefault()
-
- err = cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- // PARSE CONFIGURATION -------
-
- dsn := strings.Split(globalCfg.Addr, "://")
- if len(dsn) != 2 {
- return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
- }
-
- cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // initialize job consumer
- jc := &consumer{
- pq: pq,
- log: log,
- eh: e,
- pool: cPool,
- network: dsn[0],
- addr: dsn[1],
- tout: globalCfg.Timeout,
- tName: pipeCfg.Tube,
- reserveTimeout: pipeCfg.ReserveTimeout,
- tubePriority: pipeCfg.TubePriority,
- priority: pipeCfg.PipePriority,
-
- // buffered with two because jobs root plugin can call Stop at the same time as Pause
- stopCh: make(chan struct{}, 2),
- requeueCh: make(chan *Item, 1000),
- reconnectCh: make(chan struct{}, 2),
- }
-
- return jc, nil
-}
-
-func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- const op = errors.Op("new_beanstalk_consumer")
-
- // PARSE CONFIGURATION -------
- var globalCfg GlobalCfg
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
- }
-
- err := cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- // PARSE CONFIGURATION -------
-
- dsn := strings.Split(globalCfg.Addr, "://")
- if len(dsn) != 2 {
- return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
- }
-
- cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // initialize job consumer
- jc := &consumer{
- pq: pq,
- log: log,
- eh: e,
- pool: cPool,
- network: dsn[0],
- addr: dsn[1],
- tout: globalCfg.Timeout,
- tName: pipe.String(tube, "default"),
- reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
- tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))),
- priority: pipe.Priority(),
-
- // buffered with two because jobs root plugin can call Stop at the same time as Pause
- stopCh: make(chan struct{}, 2),
- requeueCh: make(chan *Item, 1000),
- reconnectCh: make(chan struct{}, 2),
- }
-
- return jc, nil
-}
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
- const op = errors.Op("beanstalk_push")
- // check if the pipeline registered
-
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != jb.Options.Pipeline {
- return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
- }
-
- err := j.handleItem(ctx, fromJob(jb))
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (j *consumer) handleItem(ctx context.Context, item *Item) error {
- const op = errors.Op("beanstalk_handle_item")
-
- bb := new(bytes.Buffer)
- bb.Grow(64)
- err := gob.NewEncoder(bb).Encode(item)
- if err != nil {
- return errors.E(op, err)
- }
-
- body := make([]byte, bb.Len())
- copy(body, bb.Bytes())
- bb.Reset()
- bb = nil
-
- // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
- // <pri> is an integer < 2**32. Jobs with smaller priority values will be
- // scheduled before jobs with larger priorities. The most urgent priority is 0;
- // the least urgent priority is 4,294,967,295.
- //
- // <delay> is an integer number of seconds to wait before putting the job in
- // the ready queue. The job will be in the "delayed" state during this time.
- // Maximum delay is 2**32-1.
- //
- // <ttr> -- time to run -- is an integer number of seconds to allow a worker
- // to run this job. This time is counted from the moment a worker reserves
- // this job. If the worker does not delete, release, or bury the job within
- // <ttr> seconds, the job will time out and the server will release the job.
- // The minimum ttr is 1. If the client sends 0, the server will silently
- // increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout)
- if err != nil {
- errD := j.pool.Delete(ctx, id)
- if errD != nil {
- return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
- }
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- // register the pipeline
- j.pipeline.Store(p)
- return nil
-}
-
-// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
-func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
- const op = errors.Op("beanstalk_state")
- stat, err := j.pool.Stats(ctx)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- out := &jobState.State{
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Queue: j.tName,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
- }
-
- // set stat, skip errors (replace with 0)
- // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523
- if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil {
- out.Active = int64(v)
- }
-
- // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525
- if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil {
- // this is not an error, reserved in beanstalk behaves like an active jobs
- out.Reserved = int64(v)
- }
-
- // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528
- if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil {
- out.Delayed = int64(v)
- }
-
- return out, nil
-}
-
-func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("beanstalk_run")
- start := time.Now()
-
- // load atomic value
- // check if the pipeline registered
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p.Name() {
- return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
- }
-
- atomic.AddUint32(&j.listeners, 1)
-
- go j.listen()
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: start,
- Elapsed: time.Since(start),
- })
-
- return nil
-}
-
-func (j *consumer) Stop(context.Context) error {
- start := time.Now()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- if atomic.LoadUint32(&j.listeners) == 1 {
- j.stopCh <- struct{}{}
- }
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: start,
- Elapsed: time.Since(start),
- })
-
- return nil
-}
-
-func (j *consumer) Pause(_ context.Context, p string) {
- start := time.Now()
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
- return
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
- return
- }
-
- atomic.AddUint32(&j.listeners, ^uint32(0))
-
- j.stopCh <- struct{}{}
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipePaused,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: start,
- Elapsed: time.Since(start),
- })
-}
-
-func (j *consumer) Resume(_ context.Context, p string) {
- start := time.Now()
- // load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
- return
- }
-
- l := atomic.LoadUint32(&j.listeners)
- // no active listeners
- if l == 1 {
- j.log.Warn("sqs listener already in the active state")
- return
- }
-
- // start listener
- go j.listen()
-
- // increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: start,
- Elapsed: time.Since(start),
- })
-}
-
-func ready(r uint32) bool {
- return r > 0
-}
diff --git a/plugins/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go
deleted file mode 100644
index e43207eb..00000000
--- a/plugins/beanstalk/encode_test.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "crypto/rand"
- "encoding/gob"
- "testing"
-
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-func BenchmarkEncodeGob(b *testing.B) {
- tb := make([]byte, 1024*10)
- _, err := rand.Read(tb)
- if err != nil {
- b.Fatal(err)
- }
-
- item := &Item{
- Job: "/super/test/php/class/loooooong",
- Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
- Payload: utils.AsString(tb),
- Headers: map[string][]string{"Test": {"test1", "test2"}},
- Options: &Options{
- Priority: 10,
- Pipeline: "test-local-pipe",
- Delay: 10,
- },
- }
-
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- bb := new(bytes.Buffer)
- err := gob.NewEncoder(bb).Encode(item)
- if err != nil {
- b.Fatal(err)
- }
- _ = bb.Bytes()
- bb.Reset()
- }
-}
-
-func BenchmarkEncodeJsonIter(b *testing.B) {
- tb := make([]byte, 1024*10)
- _, err := rand.Read(tb)
- if err != nil {
- b.Fatal(err)
- }
-
- item := &Item{
- Job: "/super/test/php/class/loooooong",
- Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
- Payload: utils.AsString(tb),
- Headers: map[string][]string{"Test": {"test1", "test2"}},
- Options: &Options{
- Priority: 10,
- Pipeline: "test-local-pipe",
- Delay: 10,
- },
- }
-
- b.ResetTimer()
- b.ReportAllocs()
-
- for i := 0; i < b.N; i++ {
- bb, err := json.Marshal(item)
- if err != nil {
- b.Fatal(err)
- }
- _ = bb
- }
-}
diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go
deleted file mode 100644
index 03060994..00000000
--- a/plugins/beanstalk/item.go
+++ /dev/null
@@ -1,138 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "context"
- "encoding/gob"
- "time"
-
- "github.com/beanstalkd/go-beanstalk"
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type Item struct {
- // Job contains pluginName of job broker (usually PHP class).
- Job string `json:"job"`
-
- // Ident is unique identifier of the job, should be provided from outside
- Ident string `json:"id"`
-
- // Payload is string data (usually JSON) passed to Job broker.
- Payload string `json:"payload"`
-
- // Headers with key-values pairs
- Headers map[string][]string `json:"headers"`
-
- // Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options *Options `json:"options,omitempty"`
-}
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Priority is job priority, default - 10
- // pointer to distinguish 0 as a priority and nil as priority not set
- Priority int64 `json:"priority"`
-
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int64 `json:"delay,omitempty"`
-
- // Private ================
- id uint64
- conn *beanstalk.Conn
- requeueFn func(context.Context, *Item) error
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
-
-func (i *Item) ID() string {
- return i.Ident
-}
-
-func (i *Item) Priority() int64 {
- return i.Options.Priority
-}
-
-// Body packs job payload into binary payload.
-func (i *Item) Body() []byte {
- return utils.AsBytes(i.Payload)
-}
-
-// Context packs job context (job, id) into binary payload.
-// Not used in the sqs, MessageAttributes used instead
-func (i *Item) Context() ([]byte, error) {
- ctx, err := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- Headers map[string][]string `json:"headers"`
- Pipeline string `json:"pipeline"`
- }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
- )
-
- if err != nil {
- return nil, err
- }
-
- return ctx, nil
-}
-
-func (i *Item) Ack() error {
- return i.Options.conn.Delete(i.Options.id)
-}
-
-func (i *Item) Nack() error {
- return i.Options.conn.Delete(i.Options.id)
-}
-
-func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- // overwrite the delay
- i.Options.Delay = delay
- i.Headers = headers
-
- err := i.Options.requeueFn(context.Background(), i)
- if err != nil {
- return err
- }
-
- // delete old job
- err = i.Options.conn.Delete(i.Options.id)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func fromJob(job *job.Job) *Item {
- return &Item{
- Job: job.Job,
- Ident: job.Ident,
- Payload: job.Payload,
- Headers: job.Headers,
- Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- },
- }
-}
-
-func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
- err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
- if err != nil {
- return err
- }
- out.Options.conn = j.pool.conn
- out.Options.id = id
- out.Options.requeueFn = j.handleItem
-
- return nil
-}
diff --git a/plugins/beanstalk/listen.go b/plugins/beanstalk/listen.go
deleted file mode 100644
index 6bb159ea..00000000
--- a/plugins/beanstalk/listen.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package beanstalk
-
-import (
- "github.com/beanstalkd/go-beanstalk"
-)
-
-func (j *consumer) listen() {
- for {
- select {
- case <-j.stopCh:
- j.log.Warn("beanstalk listener stopped")
- return
- default:
- id, body, err := j.pool.Reserve(j.reserveTimeout)
- if err != nil {
- if errB, ok := err.(beanstalk.ConnError); ok {
- switch errB.Err { //nolint:gocritic
- case beanstalk.ErrTimeout:
- j.log.Info("beanstalk reserve timeout", "warn", errB.Op)
- continue
- }
- }
- // in case of other error - continue
- j.log.Error("beanstalk reserve", "error", err)
- continue
- }
-
- item := &Item{}
- err = j.unpack(id, body, item)
- if err != nil {
- j.log.Error("beanstalk unpack item", "error", err)
- continue
- }
-
- // insert job into the priority queue
- j.pq.Insert(item)
- }
- }
-}
diff --git a/plugins/beanstalk/plugin.go b/plugins/beanstalk/plugin.go
deleted file mode 100644
index 529d1474..00000000
--- a/plugins/beanstalk/plugin.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package beanstalk
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- pluginName string = "beanstalk"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Serve() chan error {
- return make(chan error)
-}
-
-func (p *Plugin) Stop() error {
- return nil
-}
-
-func (p *Plugin) Name() string {
- return pluginName
-}
-
-func (p *Plugin) Available() {}
-
-func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq)
-}
-
-func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipe, p.log, p.cfg, eh, pq)
-}