diff options
Diffstat (limited to 'plugins/beanstalk')
-rw-r--r-- | plugins/beanstalk/config.go | 53 | ||||
-rw-r--r-- | plugins/beanstalk/connection.go | 223 | ||||
-rw-r--r-- | plugins/beanstalk/consumer.go | 360 | ||||
-rw-r--r-- | plugins/beanstalk/encode_test.go | 75 | ||||
-rw-r--r-- | plugins/beanstalk/item.go | 147 | ||||
-rw-r--r-- | plugins/beanstalk/listen.go | 39 | ||||
-rw-r--r-- | plugins/beanstalk/plugin.go | 47 |
7 files changed, 944 insertions, 0 deletions
diff --git a/plugins/beanstalk/config.go b/plugins/beanstalk/config.go new file mode 100644 index 00000000..a8069f5d --- /dev/null +++ b/plugins/beanstalk/config.go @@ -0,0 +1,53 @@ +package beanstalk + +import ( + "time" + + "github.com/spiral/roadrunner/v2/utils" +) + +const ( + tubePriority string = "tube_priority" + tube string = "tube" + reserveTimeout string = "reserve_timeout" +) + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` + Timeout time.Duration `mapstructure:"timeout"` +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "tcp://127.0.0.1:11300" + } + + if c.Timeout == 0 { + c.Timeout = time.Second * 30 + } +} + +type Config struct { + PipePriority int64 `mapstructure:"priority"` + TubePriority *uint32 `mapstructure:"tube_priority"` + Tube string `mapstructure:"tube"` + ReserveTimeout time.Duration `mapstructure:"reserve_timeout"` +} + +func (c *Config) InitDefault() { + if c.Tube == "" { + c.Tube = "default" + } + + if c.ReserveTimeout == 0 { + c.ReserveTimeout = time.Second * 1 + } + + if c.TubePriority == nil { + c.TubePriority = utils.Uint32(0) + } + + if c.PipePriority == 0 { + c.PipePriority = 10 + } +} diff --git a/plugins/beanstalk/connection.go b/plugins/beanstalk/connection.go new file mode 100644 index 00000000..d3241b37 --- /dev/null +++ b/plugins/beanstalk/connection.go @@ -0,0 +1,223 @@ +package beanstalk + +import ( + "context" + "net" + "sync" + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type ConnPool struct { + sync.RWMutex + + log logger.Logger + + conn *beanstalk.Conn + connT *beanstalk.Conn + ts *beanstalk.TubeSet + t *beanstalk.Tube + + network string + address string + tName string + tout time.Duration +} + +func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) { + connT, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + connTS, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + tube := beanstalk.NewTube(connT, tName) + ts := beanstalk.NewTubeSet(connTS, tName) + + return &ConnPool{ + log: log, + network: network, + address: address, + tName: tName, + tout: tout, + conn: connTS, + connT: connT, + ts: ts, + t: tube, + }, nil +} + +// Put the payload +// TODO use the context ?? +func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { + cp.RLock() + defer cp.RUnlock() + + // TODO(rustatian): redial based on the token + id, err := cp.t.Put(body, pri, delay, ttr) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry put only when we redialed + return cp.t.Put(body, pri, delay, ttr) + } + } + + return id, nil +} + +// Reserve reserves and returns a job from one of the tubes in t. If no +// job is available before time timeout has passed, Reserve returns a +// ConnError recording ErrTimeout. +// +// Typically, a client will reserve a job, perform some work, then delete +// the job with Conn.Delete. +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { + cp.RLock() + defer cp.RUnlock() + + id, body, err := cp.ts.Reserve(reserveTimeout) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry Reserve only when we redialed + return cp.ts.Reserve(reserveTimeout) + } + } + + return id, body, nil +} + +func (cp *ConnPool) Delete(_ context.Context, id uint64) error { + cp.RLock() + defer cp.RUnlock() + + err := cp.conn.Delete(id) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry Delete only when we redialed + return cp.conn.Delete(id) + } + } + return nil +} + +func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { + cp.RLock() + defer cp.RUnlock() + + stat, err := cp.conn.Stats() + if err != nil { + errR := cp.checkAndRedial(err) + if errR != nil { + return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) + } else { + return cp.conn.Stats() + } + } + + return stat, nil +} + +func (cp *ConnPool) redial() error { + const op = errors.Op("connection_pool_redial") + + cp.Lock() + // backoff here + expb := backoff.NewExponentialBackOff() + // TODO(rustatian) set via config + expb.MaxElapsedTime = time.Minute + + operation := func() error { + connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + if connT == nil { + return errors.E(op, errors.Str("connectionT is nil")) + } + + connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + + if connTS == nil { + return errors.E(op, errors.Str("connectionTS is nil")) + } + + cp.t = beanstalk.NewTube(connT, cp.tName) + cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) + cp.conn = connTS + cp.connT = connT + + cp.log.Info("beanstalk redial was successful") + return nil + } + + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + cp.Unlock() + return retryErr + } + cp.Unlock() + + return nil +} + +var connErrors = map[string]struct{}{"EOF": {}} + +func (cp *ConnPool) checkAndRedial(err error) error { + const op = errors.Op("connection_pool_check_redial") + switch et := err.(type) { //nolint:gocritic + // check if the error + case beanstalk.ConnError: + switch bErr := et.Err.(type) { + case *net.OpError: + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) + } + + // if redial was successful -> continue listening + return nil + default: + if _, ok := connErrors[et.Err.Error()]; ok { + // if error is related to the broken connection - redial + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", err, errR)) + } + // if redial was successful -> continue listening + return nil + } + } + } + + // return initial error + return err +} diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go new file mode 100644 index 00000000..5ef89983 --- /dev/null +++ b/plugins/beanstalk/consumer.go @@ -0,0 +1,360 @@ +package beanstalk + +import ( + "bytes" + "context" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +type consumer struct { + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + + pipeline atomic.Value + listeners uint32 + + // beanstalk + pool *ConnPool + addr string + network string + reserveTimeout time.Duration + reconnectCh chan struct{} + tout time.Duration + // tube name + tName string + tubePriority *uint32 + priority int64 + + stopCh chan struct{} + requeueCh chan *Item +} + +func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_beanstalk_consumer") + + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // PARSE CONFIGURATION ------- + + dsn := strings.Split(globalCfg.Addr, "://") + if len(dsn) != 2 { + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + } + + cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize job consumer + jc := &consumer{ + pq: pq, + log: log, + eh: e, + pool: cPool, + network: dsn[0], + addr: dsn[1], + tout: globalCfg.Timeout, + tName: pipeCfg.Tube, + reserveTimeout: pipeCfg.ReserveTimeout, + tubePriority: pipeCfg.TubePriority, + priority: pipeCfg.PipePriority, + + // buffered with two because jobs root plugin can call Stop at the same time as Pause + stopCh: make(chan struct{}, 2), + requeueCh: make(chan *Item, 1000), + reconnectCh: make(chan struct{}, 2), + } + + return jc, nil +} + +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("new_beanstalk_consumer") + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout")) + } + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + // PARSE CONFIGURATION ------- + + dsn := strings.Split(globalCfg.Addr, "://") + if len(dsn) != 2 { + return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) + } + + cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log) + if err != nil { + return nil, errors.E(op, err) + } + + // initialize job consumer + jc := &consumer{ + pq: pq, + log: log, + eh: e, + pool: cPool, + network: dsn[0], + addr: dsn[1], + tout: globalCfg.Timeout, + tName: pipe.String(tube, "default"), + reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), + tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))), + priority: pipe.Priority(), + + // buffered with two because jobs root plugin can call Stop at the same time as Pause + stopCh: make(chan struct{}, 2), + requeueCh: make(chan *Item, 1000), + reconnectCh: make(chan struct{}, 2), + } + + return jc, nil +} +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { + const op = errors.Op("beanstalk_push") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != jb.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) + } + + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *consumer) handleItem(ctx context.Context, item *Item) error { + const op = errors.Op("beanstalk_handle_item") + + bb := new(bytes.Buffer) + bb.Grow(64) + err := item.pack(bb) + if err != nil { + return errors.E(op, err) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 + // <pri> is an integer < 2**32. Jobs with smaller priority values will be + // scheduled before jobs with larger priorities. The most urgent priority is 0; + // the least urgent priority is 4,294,967,295. + // + // <delay> is an integer number of seconds to wait before putting the job in + // the ready queue. The job will be in the "delayed" state during this time. + // Maximum delay is 2**32-1. + // + // <ttr> -- time to run -- is an integer number of seconds to allow a worker + // to run this job. This time is counted from the moment a worker reserves + // this job. If the worker does not delete, release, or bury the job within + // <ttr> seconds, the job will time out and the server will release the job. + // The minimum ttr is 1. If the client sends 0, the server will silently + // increase the ttr to 1. Maximum ttr is 2**32-1. + id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout) + if err != nil { + errD := j.pool.Delete(ctx, id) + if errD != nil { + return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) + } + return errors.E(op, err) + } + + return nil +} + +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + // register the pipeline + j.pipeline.Store(p) + return nil +} + +// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { + const op = errors.Op("beanstalk_state") + stat, err := j.pool.Stats(ctx) + if err != nil { + return nil, errors.E(op, err) + } + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + out := &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: j.tName, + Ready: ready(atomic.LoadUint32(&j.listeners)), + } + + // set stat, skip errors (replace with 0) + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523 + if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil { + out.Active = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525 + if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil { + // this is not an error, reserved in beanstalk behaves like an active jobs + out.Reserved = int64(v) + } + + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528 + if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil { + out.Delayed = int64(v) + } + + return out, nil +} + +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("beanstalk_run") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) + } + + atomic.AddUint32(&j.listeners, 1) + + go j.listen() + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (j *consumer) Stop(context.Context) error { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + + if atomic.LoadUint32(&j.listeners) == 1 { + j.stopCh <- struct{}{} + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (j *consumer) Pause(_ context.Context, p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 0 { + j.log.Warn("no active listeners, nothing to pause") + return + } + + atomic.AddUint32(&j.listeners, ^uint32(0)) + + j.stopCh <- struct{}{} + + j.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (j *consumer) Resume(_ context.Context, p string) { + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + return + } + + l := atomic.LoadUint32(&j.listeners) + // no active listeners + if l == 1 { + j.log.Warn("sqs listener already in the active state") + return + } + + // start listener + go j.listen() + + // increase num of listeners + atomic.AddUint32(&j.listeners, 1) + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go new file mode 100644 index 00000000..e43207eb --- /dev/null +++ b/plugins/beanstalk/encode_test.go @@ -0,0 +1,75 @@ +package beanstalk + +import ( + "bytes" + "crypto/rand" + "encoding/gob" + "testing" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/utils" +) + +func BenchmarkEncodeGob(b *testing.B) { + tb := make([]byte, 1024*10) + _, err := rand.Read(tb) + if err != nil { + b.Fatal(err) + } + + item := &Item{ + Job: "/super/test/php/class/loooooong", + Ident: "12341234-asdfasdfa-1234234-asdfasdfas", + Payload: utils.AsString(tb), + Headers: map[string][]string{"Test": {"test1", "test2"}}, + Options: &Options{ + Priority: 10, + Pipeline: "test-local-pipe", + Delay: 10, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + bb := new(bytes.Buffer) + err := gob.NewEncoder(bb).Encode(item) + if err != nil { + b.Fatal(err) + } + _ = bb.Bytes() + bb.Reset() + } +} + +func BenchmarkEncodeJsonIter(b *testing.B) { + tb := make([]byte, 1024*10) + _, err := rand.Read(tb) + if err != nil { + b.Fatal(err) + } + + item := &Item{ + Job: "/super/test/php/class/loooooong", + Ident: "12341234-asdfasdfa-1234234-asdfasdfas", + Payload: utils.AsString(tb), + Headers: map[string][]string{"Test": {"test1", "test2"}}, + Options: &Options{ + Priority: 10, + Pipeline: "test-local-pipe", + Delay: 10, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + bb, err := json.Marshal(item) + if err != nil { + b.Fatal(err) + } + _ = bb + } +} diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go new file mode 100644 index 00000000..0a6cd560 --- /dev/null +++ b/plugins/beanstalk/item.go @@ -0,0 +1,147 @@ +package beanstalk + +import ( + "bytes" + "context" + "encoding/gob" + "time" + + "github.com/beanstalkd/go-beanstalk" + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // Private ================ + id uint64 + conn *beanstalk.Conn + requeueFn func(context.Context, *Item) error +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +// Body packs job payload into binary payload. +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +// Context packs job context (job, id) into binary payload. +// Not used in the sqs, MessageAttributes used instead +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + return i.Options.conn.Delete(i.Options.id) +} + +func (i *Item) Nack() error { + return i.Options.conn.Delete(i.Options.id) +} + +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + // overwrite the delay + i.Options.Delay = delay + i.Headers = headers + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err + } + + // delete old job + err = i.Options.conn.Delete(i.Options.id) + if err != nil { + return err + } + + return nil +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Headers: job.Headers, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} + +func (i *Item) pack(b *bytes.Buffer) error { + err := gob.NewEncoder(b).Encode(i) + if err != nil { + return err + } + + return nil +} + +func (j *consumer) unpack(id uint64, data []byte, out *Item) error { + err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) + if err != nil { + return err + } + out.Options.conn = j.pool.conn + out.Options.id = id + out.Options.requeueFn = j.handleItem + + return nil +} diff --git a/plugins/beanstalk/listen.go b/plugins/beanstalk/listen.go new file mode 100644 index 00000000..6bb159ea --- /dev/null +++ b/plugins/beanstalk/listen.go @@ -0,0 +1,39 @@ +package beanstalk + +import ( + "github.com/beanstalkd/go-beanstalk" +) + +func (j *consumer) listen() { + for { + select { + case <-j.stopCh: + j.log.Warn("beanstalk listener stopped") + return + default: + id, body, err := j.pool.Reserve(j.reserveTimeout) + if err != nil { + if errB, ok := err.(beanstalk.ConnError); ok { + switch errB.Err { //nolint:gocritic + case beanstalk.ErrTimeout: + j.log.Info("beanstalk reserve timeout", "warn", errB.Op) + continue + } + } + // in case of other error - continue + j.log.Error("beanstalk reserve", "error", err) + continue + } + + item := &Item{} + err = j.unpack(id, body, item) + if err != nil { + j.log.Error("beanstalk unpack item", "error", err) + continue + } + + // insert job into the priority queue + j.pq.Insert(item) + } + } +} diff --git a/plugins/beanstalk/plugin.go b/plugins/beanstalk/plugin.go new file mode 100644 index 00000000..529d1474 --- /dev/null +++ b/plugins/beanstalk/plugin.go @@ -0,0 +1,47 @@ +package beanstalk + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + pluginName string = "beanstalk" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Serve() chan error { + return make(chan error) +} + +func (p *Plugin) Stop() error { + return nil +} + +func (p *Plugin) Name() string { + return pluginName +} + +func (p *Plugin) Available() {} + +func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq) +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, p.log, p.cfg, eh, pq) +} |