diff options
Diffstat (limited to 'plugins/beanstalk')
-rw-r--r-- | plugins/beanstalk/config.go | 53 | ||||
-rw-r--r-- | plugins/beanstalk/connection.go | 223 | ||||
-rw-r--r-- | plugins/beanstalk/consumer.go | 374 | ||||
-rw-r--r-- | plugins/beanstalk/encode_test.go | 75 | ||||
-rw-r--r-- | plugins/beanstalk/item.go | 138 | ||||
-rw-r--r-- | plugins/beanstalk/listen.go | 39 | ||||
-rw-r--r-- | plugins/beanstalk/plugin.go | 47 |
7 files changed, 0 insertions, 949 deletions
diff --git a/plugins/beanstalk/config.go b/plugins/beanstalk/config.go deleted file mode 100644 index a8069f5d..00000000 --- a/plugins/beanstalk/config.go +++ /dev/null @@ -1,53 +0,0 @@ -package beanstalk - -import ( - "time" - - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - tubePriority string = "tube_priority" - tube string = "tube" - reserveTimeout string = "reserve_timeout" -) - -type GlobalCfg struct { - Addr string `mapstructure:"addr"` - Timeout time.Duration `mapstructure:"timeout"` -} - -func (c *GlobalCfg) InitDefault() { - if c.Addr == "" { - c.Addr = "tcp://127.0.0.1:11300" - } - - if c.Timeout == 0 { - c.Timeout = time.Second * 30 - } -} - -type Config struct { - PipePriority int64 `mapstructure:"priority"` - TubePriority *uint32 `mapstructure:"tube_priority"` - Tube string `mapstructure:"tube"` - ReserveTimeout time.Duration `mapstructure:"reserve_timeout"` -} - -func (c *Config) InitDefault() { - if c.Tube == "" { - c.Tube = "default" - } - - if c.ReserveTimeout == 0 { - c.ReserveTimeout = time.Second * 1 - } - - if c.TubePriority == nil { - c.TubePriority = utils.Uint32(0) - } - - if c.PipePriority == 0 { - c.PipePriority = 10 - } -} diff --git a/plugins/beanstalk/connection.go b/plugins/beanstalk/connection.go deleted file mode 100644 index d3241b37..00000000 --- a/plugins/beanstalk/connection.go +++ /dev/null @@ -1,223 +0,0 @@ -package beanstalk - -import ( - "context" - "net" - "sync" - "time" - - "github.com/beanstalkd/go-beanstalk" - "github.com/cenkalti/backoff/v4" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type ConnPool struct { - sync.RWMutex - - log logger.Logger - - conn *beanstalk.Conn - connT *beanstalk.Conn - ts *beanstalk.TubeSet - t *beanstalk.Tube - - network string - address string - tName string - tout time.Duration -} - -func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) { - connT, err := beanstalk.DialTimeout(network, address, tout) - if err != nil { - return nil, err - } - - connTS, err := beanstalk.DialTimeout(network, address, tout) - if err != nil { - return nil, err - } - - tube := beanstalk.NewTube(connT, tName) - ts := beanstalk.NewTubeSet(connTS, tName) - - return &ConnPool{ - log: log, - network: network, - address: address, - tName: tName, - tout: tout, - conn: connTS, - connT: connT, - ts: ts, - t: tube, - }, nil -} - -// Put the payload -// TODO use the context ?? -func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { - cp.RLock() - defer cp.RUnlock() - - // TODO(rustatian): redial based on the token - id, err := cp.t.Put(body, pri, delay, ttr) - if err != nil { - // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) - if errN != nil { - return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) - } else { - // retry put only when we redialed - return cp.t.Put(body, pri, delay, ttr) - } - } - - return id, nil -} - -// Reserve reserves and returns a job from one of the tubes in t. If no -// job is available before time timeout has passed, Reserve returns a -// ConnError recording ErrTimeout. -// -// Typically, a client will reserve a job, perform some work, then delete -// the job with Conn.Delete. -func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { - cp.RLock() - defer cp.RUnlock() - - id, body, err := cp.ts.Reserve(reserveTimeout) - if err != nil { - // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) - if errN != nil { - return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) - } else { - // retry Reserve only when we redialed - return cp.ts.Reserve(reserveTimeout) - } - } - - return id, body, nil -} - -func (cp *ConnPool) Delete(_ context.Context, id uint64) error { - cp.RLock() - defer cp.RUnlock() - - err := cp.conn.Delete(id) - if err != nil { - // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) - if errN != nil { - return errors.Errorf("err: %s\nerr redial: %s", err, errN) - } else { - // retry Delete only when we redialed - return cp.conn.Delete(id) - } - } - return nil -} - -func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { - cp.RLock() - defer cp.RUnlock() - - stat, err := cp.conn.Stats() - if err != nil { - errR := cp.checkAndRedial(err) - if errR != nil { - return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) - } else { - return cp.conn.Stats() - } - } - - return stat, nil -} - -func (cp *ConnPool) redial() error { - const op = errors.Op("connection_pool_redial") - - cp.Lock() - // backoff here - expb := backoff.NewExponentialBackOff() - // TODO(rustatian) set via config - expb.MaxElapsedTime = time.Minute - - operation := func() error { - connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) - if err != nil { - return err - } - if connT == nil { - return errors.E(op, errors.Str("connectionT is nil")) - } - - connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) - if err != nil { - return err - } - - if connTS == nil { - return errors.E(op, errors.Str("connectionTS is nil")) - } - - cp.t = beanstalk.NewTube(connT, cp.tName) - cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) - cp.conn = connTS - cp.connT = connT - - cp.log.Info("beanstalk redial was successful") - return nil - } - - retryErr := backoff.Retry(operation, expb) - if retryErr != nil { - cp.Unlock() - return retryErr - } - cp.Unlock() - - return nil -} - -var connErrors = map[string]struct{}{"EOF": {}} - -func (cp *ConnPool) checkAndRedial(err error) error { - const op = errors.Op("connection_pool_check_redial") - switch et := err.(type) { //nolint:gocritic - // check if the error - case beanstalk.ConnError: - switch bErr := et.Err.(type) { - case *net.OpError: - cp.RUnlock() - errR := cp.redial() - cp.RLock() - // if redial failed - return - if errR != nil { - return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) - } - - // if redial was successful -> continue listening - return nil - default: - if _, ok := connErrors[et.Err.Error()]; ok { - // if error is related to the broken connection - redial - cp.RUnlock() - errR := cp.redial() - cp.RLock() - // if redial failed - return - if errR != nil { - return errors.E(op, errors.Errorf("%v:%v", err, errR)) - } - // if redial was successful -> continue listening - return nil - } - } - } - - // return initial error - return err -} diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go deleted file mode 100644 index 30807f03..00000000 --- a/plugins/beanstalk/consumer.go +++ /dev/null @@ -1,374 +0,0 @@ -package beanstalk - -import ( - "bytes" - "context" - "encoding/gob" - "strconv" - "strings" - "sync/atomic" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -type consumer struct { - log logger.Logger - eh events.Handler - pq priorityqueue.Queue - - pipeline atomic.Value - listeners uint32 - - // beanstalk - pool *ConnPool - addr string - network string - reserveTimeout time.Duration - reconnectCh chan struct{} - tout time.Duration - // tube name - tName string - tubePriority *uint32 - priority int64 - - stopCh chan struct{} - requeueCh chan *Item -} - -func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_beanstalk_consumer") - - // PARSE CONFIGURATION ------- - var pipeCfg Config - var globalCfg GlobalCfg - - if !cfg.Has(configKey) { - return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) - } - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) - } - - err := cfg.UnmarshalKey(configKey, &pipeCfg) - if err != nil { - return nil, errors.E(op, err) - } - - pipeCfg.InitDefault() - - err = cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - // PARSE CONFIGURATION ------- - - dsn := strings.Split(globalCfg.Addr, "://") - if len(dsn) != 2 { - return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) - } - - cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log) - if err != nil { - return nil, errors.E(op, err) - } - - // initialize job consumer - jc := &consumer{ - pq: pq, - log: log, - eh: e, - pool: cPool, - network: dsn[0], - addr: dsn[1], - tout: globalCfg.Timeout, - tName: pipeCfg.Tube, - reserveTimeout: pipeCfg.ReserveTimeout, - tubePriority: pipeCfg.TubePriority, - priority: pipeCfg.PipePriority, - - // buffered with two because jobs root plugin can call Stop at the same time as Pause - stopCh: make(chan struct{}, 2), - requeueCh: make(chan *Item, 1000), - reconnectCh: make(chan struct{}, 2), - } - - return jc, nil -} - -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { - const op = errors.Op("new_beanstalk_consumer") - - // PARSE CONFIGURATION ------- - var globalCfg GlobalCfg - - // if no global section - if !cfg.Has(pluginName) { - return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) - } - - err := cfg.UnmarshalKey(pluginName, &globalCfg) - if err != nil { - return nil, errors.E(op, err) - } - - globalCfg.InitDefault() - - // PARSE CONFIGURATION ------- - - dsn := strings.Split(globalCfg.Addr, "://") - if len(dsn) != 2 { - return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) - } - - cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log) - if err != nil { - return nil, errors.E(op, err) - } - - // initialize job consumer - jc := &consumer{ - pq: pq, - log: log, - eh: e, - pool: cPool, - network: dsn[0], - addr: dsn[1], - tout: globalCfg.Timeout, - tName: pipe.String(tube, "default"), - reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))), - priority: pipe.Priority(), - - // buffered with two because jobs root plugin can call Stop at the same time as Pause - stopCh: make(chan struct{}, 2), - requeueCh: make(chan *Item, 1000), - reconnectCh: make(chan struct{}, 2), - } - - return jc, nil -} -func (j *consumer) Push(ctx context.Context, jb *job.Job) error { - const op = errors.Op("beanstalk_push") - // check if the pipeline registered - - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != jb.Options.Pipeline { - return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) - } - - err := j.handleItem(ctx, fromJob(jb)) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (j *consumer) handleItem(ctx context.Context, item *Item) error { - const op = errors.Op("beanstalk_handle_item") - - bb := new(bytes.Buffer) - bb.Grow(64) - err := gob.NewEncoder(bb).Encode(item) - if err != nil { - return errors.E(op, err) - } - - body := make([]byte, bb.Len()) - copy(body, bb.Bytes()) - bb.Reset() - bb = nil - - // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 - // <pri> is an integer < 2**32. Jobs with smaller priority values will be - // scheduled before jobs with larger priorities. The most urgent priority is 0; - // the least urgent priority is 4,294,967,295. - // - // <delay> is an integer number of seconds to wait before putting the job in - // the ready queue. The job will be in the "delayed" state during this time. - // Maximum delay is 2**32-1. - // - // <ttr> -- time to run -- is an integer number of seconds to allow a worker - // to run this job. This time is counted from the moment a worker reserves - // this job. If the worker does not delete, release, or bury the job within - // <ttr> seconds, the job will time out and the server will release the job. - // The minimum ttr is 1. If the client sends 0, the server will silently - // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout) - if err != nil { - errD := j.pool.Delete(ctx, id) - if errD != nil { - return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) - } - return errors.E(op, err) - } - - return nil -} - -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - // register the pipeline - j.pipeline.Store(p) - return nil -} - -// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { - const op = errors.Op("beanstalk_state") - stat, err := j.pool.Stats(ctx) - if err != nil { - return nil, errors.E(op, err) - } - - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - out := &jobState.State{ - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Queue: j.tName, - Ready: ready(atomic.LoadUint32(&j.listeners)), - } - - // set stat, skip errors (replace with 0) - // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 - if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { - out.Active = int64(v) - } - - // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 - if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { - // this is not an error, reserved in beanstalk behaves like an active jobs - out.Reserved = int64(v) - } - - // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 - if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { - out.Delayed = int64(v) - } - - return out, nil -} - -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("beanstalk_run") - start := time.Now() - - // load atomic value - // check if the pipeline registered - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p.Name() { - return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) - } - - atomic.AddUint32(&j.listeners, 1) - - go j.listen() - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) - - return nil -} - -func (j *consumer) Stop(context.Context) error { - start := time.Now() - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - if atomic.LoadUint32(&j.listeners) == 1 { - j.stopCh <- struct{}{} - } - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) - - return nil -} - -func (j *consumer) Pause(_ context.Context, p string) { - start := time.Now() - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) - return - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 0 { - j.log.Warn("no active listeners, nothing to pause") - return - } - - atomic.AddUint32(&j.listeners, ^uint32(0)) - - j.stopCh <- struct{}{} - - j.eh.Push(events.JobEvent{ - Event: events.EventPipePaused, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func (j *consumer) Resume(_ context.Context, p string) { - start := time.Now() - // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) - if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) - return - } - - l := atomic.LoadUint32(&j.listeners) - // no active listeners - if l == 1 { - j.log.Warn("sqs listener already in the active state") - return - } - - // start listener - go j.listen() - - // increase num of listeners - atomic.AddUint32(&j.listeners, 1) - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeActive, - Driver: pipe.Driver(), - Pipeline: pipe.Name(), - Start: start, - Elapsed: time.Since(start), - }) -} - -func ready(r uint32) bool { - return r > 0 -} diff --git a/plugins/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go deleted file mode 100644 index e43207eb..00000000 --- a/plugins/beanstalk/encode_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package beanstalk - -import ( - "bytes" - "crypto/rand" - "encoding/gob" - "testing" - - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/utils" -) - -func BenchmarkEncodeGob(b *testing.B) { - tb := make([]byte, 1024*10) - _, err := rand.Read(tb) - if err != nil { - b.Fatal(err) - } - - item := &Item{ - Job: "/super/test/php/class/loooooong", - Ident: "12341234-asdfasdfa-1234234-asdfasdfas", - Payload: utils.AsString(tb), - Headers: map[string][]string{"Test": {"test1", "test2"}}, - Options: &Options{ - Priority: 10, - Pipeline: "test-local-pipe", - Delay: 10, - }, - } - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - bb := new(bytes.Buffer) - err := gob.NewEncoder(bb).Encode(item) - if err != nil { - b.Fatal(err) - } - _ = bb.Bytes() - bb.Reset() - } -} - -func BenchmarkEncodeJsonIter(b *testing.B) { - tb := make([]byte, 1024*10) - _, err := rand.Read(tb) - if err != nil { - b.Fatal(err) - } - - item := &Item{ - Job: "/super/test/php/class/loooooong", - Ident: "12341234-asdfasdfa-1234234-asdfasdfas", - Payload: utils.AsString(tb), - Headers: map[string][]string{"Test": {"test1", "test2"}}, - Options: &Options{ - Priority: 10, - Pipeline: "test-local-pipe", - Delay: 10, - }, - } - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - bb, err := json.Marshal(item) - if err != nil { - b.Fatal(err) - } - _ = bb - } -} diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go deleted file mode 100644 index 03060994..00000000 --- a/plugins/beanstalk/item.go +++ /dev/null @@ -1,138 +0,0 @@ -package beanstalk - -import ( - "bytes" - "context" - "encoding/gob" - "time" - - "github.com/beanstalkd/go-beanstalk" - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/utils" -) - -type Item struct { - // Job contains pluginName of job broker (usually PHP class). - Job string `json:"job"` - - // Ident is unique identifier of the job, should be provided from outside - Ident string `json:"id"` - - // Payload is string data (usually JSON) passed to Job broker. - Payload string `json:"payload"` - - // Headers with key-values pairs - Headers map[string][]string `json:"headers"` - - // Options contains set of PipelineOptions specific to job execution. Can be empty. - Options *Options `json:"options,omitempty"` -} - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` - - // Private ================ - id uint64 - conn *beanstalk.Conn - requeueFn func(context.Context, *Item) error -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} - -func (i *Item) ID() string { - return i.Ident -} - -func (i *Item) Priority() int64 { - return i.Options.Priority -} - -// Body packs job payload into binary payload. -func (i *Item) Body() []byte { - return utils.AsBytes(i.Payload) -} - -// Context packs job context (job, id) into binary payload. -// Not used in the sqs, MessageAttributes used instead -func (i *Item) Context() ([]byte, error) { - ctx, err := json.Marshal( - struct { - ID string `json:"id"` - Job string `json:"job"` - Headers map[string][]string `json:"headers"` - Pipeline string `json:"pipeline"` - }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, - ) - - if err != nil { - return nil, err - } - - return ctx, nil -} - -func (i *Item) Ack() error { - return i.Options.conn.Delete(i.Options.id) -} - -func (i *Item) Nack() error { - return i.Options.conn.Delete(i.Options.id) -} - -func (i *Item) Requeue(headers map[string][]string, delay int64) error { - // overwrite the delay - i.Options.Delay = delay - i.Headers = headers - - err := i.Options.requeueFn(context.Background(), i) - if err != nil { - return err - } - - // delete old job - err = i.Options.conn.Delete(i.Options.id) - if err != nil { - return err - } - - return nil -} - -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Headers: job.Headers, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - }, - } -} - -func (j *consumer) unpack(id uint64, data []byte, out *Item) error { - err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) - if err != nil { - return err - } - out.Options.conn = j.pool.conn - out.Options.id = id - out.Options.requeueFn = j.handleItem - - return nil -} diff --git a/plugins/beanstalk/listen.go b/plugins/beanstalk/listen.go deleted file mode 100644 index 6bb159ea..00000000 --- a/plugins/beanstalk/listen.go +++ /dev/null @@ -1,39 +0,0 @@ -package beanstalk - -import ( - "github.com/beanstalkd/go-beanstalk" -) - -func (j *consumer) listen() { - for { - select { - case <-j.stopCh: - j.log.Warn("beanstalk listener stopped") - return - default: - id, body, err := j.pool.Reserve(j.reserveTimeout) - if err != nil { - if errB, ok := err.(beanstalk.ConnError); ok { - switch errB.Err { //nolint:gocritic - case beanstalk.ErrTimeout: - j.log.Info("beanstalk reserve timeout", "warn", errB.Op) - continue - } - } - // in case of other error - continue - j.log.Error("beanstalk reserve", "error", err) - continue - } - - item := &Item{} - err = j.unpack(id, body, item) - if err != nil { - j.log.Error("beanstalk unpack item", "error", err) - continue - } - - // insert job into the priority queue - j.pq.Insert(item) - } - } -} diff --git a/plugins/beanstalk/plugin.go b/plugins/beanstalk/plugin.go deleted file mode 100644 index 529d1474..00000000 --- a/plugins/beanstalk/plugin.go +++ /dev/null @@ -1,47 +0,0 @@ -package beanstalk - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - pluginName string = "beanstalk" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error) -} - -func (p *Plugin) Stop() error { - return nil -} - -func (p *Plugin) Name() string { - return pluginName -} - -func (p *Plugin) Available() {} - -func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq) -} - -func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, p.log, p.cfg, eh, pq) -} |