summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/beanstalk
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-16 12:56:02 +0300
committerValery Piashchynski <[email protected]>2021-06-16 12:56:02 +0300
commitcee4bc46097506d6e892b6af194751434700621a (patch)
treee542d1b2f963c2aa0e304703c82ff4f04203b169 /plugins/jobs/oooold/broker/beanstalk
parentd4c92e48bada7593b6fbec612a742c599de6e736 (diff)
- Update jobs sources
- Update Arch diagramm Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker/beanstalk')
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/broker.go185
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/broker_test.go276
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/config.go50
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/config_test.go47
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/conn.go180
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/constants.go6
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/consume_test.go242
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/durability_test.go575
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/job.go24
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/sock.bean0
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/stat_test.go66
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube.go250
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube_test.go18
13 files changed, 1919 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go
new file mode 100644
index 00000000..dc3ea518
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/broker.go
@@ -0,0 +1,185 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "sync"
+)
+
+// Broker run consume using Broker service.
+type Broker struct {
+ cfg *Config
+ lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ conn *conn
+ tubes map[*jobs.Pipeline]*tube
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures broker.
+func (b *Broker) Init(cfg *Config) (bool, error) {
+ b.cfg = cfg
+ b.tubes = make(map[*jobs.Pipeline]*tube)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.tubes[pipe]; ok {
+ return fmt.Errorf("tube `%s` has already been registered", pipe.Name())
+ }
+
+ t, err := newTube(pipe, b.throw)
+ if err != nil {
+ return err
+ }
+
+ b.tubes[pipe] = t
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() (err error) {
+ b.mu.Lock()
+
+ if b.conn, err = b.cfg.newConn(); err != nil {
+ return err
+ }
+ defer b.conn.Close()
+
+ for _, t := range b.tubes {
+ tt := t
+ if tt.execPool != nil {
+ go tt.serve(b.cfg)
+ }
+ }
+
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ for _, t := range b.tubes {
+ t.stop()
+ }
+
+ close(b.wait)
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ t, ok := b.tubes[pipe]
+ if !ok {
+ return fmt.Errorf("undefined tube `%s`", pipe.Name())
+ }
+
+ t.stop()
+
+ t.execPool = execPool
+ t.errHandler = errHandler
+
+ if b.conn != nil {
+ tt := t
+ if tt.execPool != nil {
+ go tt.serve(connFactory(b.cfg))
+ }
+ }
+
+ return nil
+}
+
+// Push data into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ t := b.tube(pipe)
+ if t == nil {
+ return "", fmt.Errorf("undefined tube `%s`", pipe.Name())
+ }
+
+ data, err := pack(j)
+ if err != nil {
+ return "", err
+ }
+
+ return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration())
+}
+
+// Stat must fetch statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ t := b.tube(pipe)
+ if t == nil {
+ return nil, fmt.Errorf("undefined tube `%s`", pipe.Name())
+ }
+
+ return t.stat(b.conn)
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) tube(pipe *jobs.Pipeline) *tube {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ t, ok := b.tubes[pipe]
+ if !ok {
+ return nil
+ }
+
+ return t
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go
new file mode 100644
index 00000000..cd2132af
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/broker_test.go
@@ -0,0 +1,276 @@
+package beanstalk
+
+import (
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+var (
+ pipe = &jobs.Pipeline{
+ "broker": "beanstalk",
+ "name": "default",
+ "tube": "test",
+ }
+
+ cfg = &Config{
+ Addr: "tcp://localhost:11300",
+ }
+)
+
+func init() {
+ conn, err := beanstalk.Dial("tcp", "localhost:11300")
+ if err != nil {
+ panic(err)
+ }
+ defer conn.Close()
+
+ t := beanstalk.Tube{Name: "testTube", Conn: conn}
+
+ for {
+ id, _, err := t.PeekReady()
+ if id == 0 || err != nil {
+ break
+ }
+
+ if err := conn.Delete(id); err != nil {
+ panic(err)
+ }
+ }
+}
+
+func TestBroker_Init(t *testing.T) {
+ b := &Broker{}
+ ok, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.True(t, ok)
+ assert.NoError(t, err)
+}
+
+func TestBroker_StopNotStarted(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ b.Stop()
+}
+
+func TestBroker_Register(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+}
+
+func TestBroker_Register_Twice(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+ assert.Error(t, b.Register(pipe))
+}
+
+func TestBroker_Register_Invalid(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Error(t, b.Register(&jobs.Pipeline{
+ "broker": "beanstalk",
+ "name": "default",
+ }))
+}
+
+func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_Undefined(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ assert.NoError(t, b.Consume(pipe, exec, errf))
+}
+
+func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = b.Consume(pipe, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_Serve_Error(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(&Config{
+ Addr: "tcp://localhost:11399",
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Serve())
+}
+
+func TestBroker_Consume_Serve_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ err = b.Consume(pipe, exec, errf)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_PushToNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_PushToNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go
new file mode 100644
index 00000000..3e48a2d7
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/config.go
@@ -0,0 +1,50 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner/service"
+ "strings"
+ "time"
+)
+
+// Config defines beanstalk broker configuration.
+type Config struct {
+ // Addr of beanstalk server.
+ Addr string
+
+ // Timeout to allocate the connection. Default 10 seconds.
+ Timeout int
+}
+
+// Hydrate config values.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ if c.Addr == "" {
+ return fmt.Errorf("beanstalk address is missing")
+ }
+
+ return nil
+}
+
+// TimeoutDuration returns number of seconds allowed to allocate the connection.
+func (c *Config) TimeoutDuration() time.Duration {
+ timeout := c.Timeout
+ if timeout == 0 {
+ timeout = 10
+ }
+
+ return time.Duration(timeout) * time.Second
+}
+
+// size creates new rpc socket Listener.
+func (c *Config) newConn() (*conn, error) {
+ dsn := strings.Split(c.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)")
+ }
+
+ return newConn(dsn[0], dsn[1], c.TimeoutDuration())
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go
new file mode 100644
index 00000000..4ba08a04
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/config_test.go
@@ -0,0 +1,47 @@
+package beanstalk
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func TestConfig_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{`{"dead`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func TestConfig_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"addr":""}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func TestConfig_Hydrate_Error3(t *testing.T) {
+ cfg := &mockCfg{`{"addr":"tcp"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ _, err := c.newConn()
+ assert.Error(t, err)
+}
+
+func TestConfig_Hydrate_Error4(t *testing.T) {
+ cfg := &mockCfg{`{"addr":"unix://sock.bean"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ _, err := c.newConn()
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go
new file mode 100644
index 00000000..7aba6bbb
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/conn.go
@@ -0,0 +1,180 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
+ "strings"
+ "sync"
+ "time"
+)
+
+var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
+
+// creates new connections
+type connFactory interface {
+ newConn() (*conn, error)
+}
+
+// conn protects allocation for one connection between
+// threads and provides reconnecting capabilities.
+type conn struct {
+ tout time.Duration
+ conn *beanstalk.Conn
+ alive bool
+ free chan interface{}
+ dead chan interface{}
+ stop chan interface{}
+ lock *sync.Cond
+}
+
+// creates new beanstalk connection and reconnect watcher.
+func newConn(network, addr string, tout time.Duration) (cn *conn, err error) {
+ cn = &conn{
+ tout: tout,
+ alive: true,
+ free: make(chan interface{}, 1),
+ dead: make(chan interface{}, 1),
+ stop: make(chan interface{}),
+ lock: sync.NewCond(&sync.Mutex{}),
+ }
+
+ cn.conn, err = beanstalk.Dial(network, addr)
+ if err != nil {
+ return nil, err
+ }
+
+ go cn.watch(network, addr)
+
+ return cn, nil
+}
+
+// reset the connection and reconnect watcher.
+func (cn *conn) Close() error {
+ cn.lock.L.Lock()
+ defer cn.lock.L.Unlock()
+
+ close(cn.stop)
+ for cn.alive {
+ cn.lock.Wait()
+ }
+
+ return nil
+}
+
+// acquire connection instance or return error in case of timeout. When mandratory set to true
+// timeout won't be applied.
+func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) {
+ // do not apply timeout on mandatory connections
+ if mandatory {
+ select {
+ case <-cn.stop:
+ return nil, fmt.Errorf("connection closed")
+ case <-cn.free:
+ return cn.conn, nil
+ }
+ }
+
+ select {
+ case <-cn.stop:
+ return nil, fmt.Errorf("connection closed")
+ case <-cn.free:
+ return cn.conn, nil
+ default:
+ // *2 to handle commands called right after the connection reset
+ tout := time.NewTimer(cn.tout * 2)
+ select {
+ case <-cn.stop:
+ tout.Stop()
+ return nil, fmt.Errorf("connection closed")
+ case <-cn.free:
+ tout.Stop()
+ return cn.conn, nil
+ case <-tout.C:
+ return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout)
+ }
+ }
+}
+
+// release acquired connection.
+func (cn *conn) release(err error) error {
+ if isConnError(err) {
+ // reconnect is required
+ cn.dead <- err
+ } else {
+ cn.free <- nil
+ }
+
+ return err
+}
+
+// watch and reconnect if dead
+func (cn *conn) watch(network, addr string) {
+ cn.free <- nil
+ t := time.NewTicker(WatchThrottleLimit)
+ defer t.Stop()
+ for {
+ select {
+ case <-cn.dead:
+ // simple throttle limiter
+ <-t.C
+ // try to reconnect
+ // TODO add logging here
+ expb := backoff.NewExponentialBackOff()
+ expb.MaxInterval = cn.tout
+
+ reconnect := func() error {
+ conn, err := beanstalk.Dial(network, addr)
+ if err != nil {
+ fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error()))
+ return err
+ }
+
+ // TODO ADD LOGGING
+ fmt.Println("------beanstalk successfully redialed------")
+
+ cn.conn = conn
+ cn.free <- nil
+ return nil
+ }
+
+ err := backoff.Retry(reconnect, expb)
+ if err != nil {
+ fmt.Println(fmt.Sprintf("redial failed: %s", err.Error()))
+ cn.dead <- nil
+ }
+
+ case <-cn.stop:
+ cn.lock.L.Lock()
+ select {
+ case <-cn.dead:
+ case <-cn.free:
+ }
+
+ // stop underlying connection
+ cn.conn.Close()
+ cn.alive = false
+ cn.lock.Signal()
+
+ cn.lock.L.Unlock()
+
+ return
+ }
+ }
+}
+
+// isConnError indicates that error is related to dead socket.
+func isConnError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ for _, errStr := range connErrors {
+ // golang...
+ if strings.Contains(err.Error(), errStr) {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go
new file mode 100644
index 00000000..84be305e
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/constants.go
@@ -0,0 +1,6 @@
+package beanstalk
+
+import "time"
+
+// WatchThrottleLimit is used to limit reconnection occurrence in watch function
+const WatchThrottleLimit = time.Second \ No newline at end of file
diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go
new file mode 100644
index 00000000..b16866ae
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/consume_test.go
@@ -0,0 +1,242 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Consume_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_Delayed(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Delay: 1},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ start := time.Now()
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ elapsed := time.Since(start)
+ assert.True(t, elapsed >= time.Second)
+ assert.True(t, elapsed < 2*time.Second)
+}
+
+func TestBroker_Consume_Errored(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ close(errHandled)
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return fmt.Errorf("job failed")
+ }
+
+ <-waitJob
+ <-errHandled
+}
+
+func TestBroker_Consume_Errored_Attempts(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ attempts := 0
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ attempts++
+ errHandled <- nil
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Attempts: 3},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ return fmt.Errorf("job failed")
+ }
+
+ <-errHandled
+ <-errHandled
+ <-errHandled
+ assert.Equal(t, 3, attempts)
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go
new file mode 100644
index 00000000..499a5206
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/durability_test.go
@@ -0,0 +1,575 @@
+package beanstalk
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "net"
+ "sync"
+ "testing"
+ "time"
+)
+
+var (
+ proxyCfg = &Config{
+ Addr: "tcp://localhost:11301",
+ Timeout: 1,
+ }
+
+ proxy = &tcpProxy{
+ listen: "localhost:11301",
+ upstream: "localhost:11300",
+ accept: true,
+ }
+)
+
+type tcpProxy struct {
+ listen string
+ upstream string
+ mu sync.Mutex
+ accept bool
+ conn []net.Conn
+}
+
+func (p *tcpProxy) serve() {
+ l, err := net.Listen("tcp", p.listen)
+ if err != nil {
+ panic(err)
+ }
+
+ for {
+ in, err := l.Accept()
+ if err != nil {
+ panic(err)
+ }
+
+ if !p.accepting() {
+ in.Close()
+ }
+
+ up, err := net.Dial("tcp", p.upstream)
+ if err != nil {
+ panic(err)
+ }
+
+ go io.Copy(in, up)
+ go io.Copy(up, in)
+
+ p.mu.Lock()
+ p.conn = append(p.conn, in, up)
+ p.mu.Unlock()
+ }
+}
+
+// wait for specific number of connections
+func (p *tcpProxy) waitConn(count int) *tcpProxy {
+ p.mu.Lock()
+ p.accept = true
+ p.mu.Unlock()
+
+ for {
+ p.mu.Lock()
+ current := len(p.conn)
+ p.mu.Unlock()
+
+ if current >= count*2 {
+ break
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+
+ return p
+}
+
+func (p *tcpProxy) reset(accept bool) int {
+ p.mu.Lock()
+ p.accept = accept
+ defer p.mu.Unlock()
+
+ count := 0
+ for _, conn := range p.conn {
+ conn.Close()
+ count++
+ }
+
+ p.conn = nil
+ return count / 2
+}
+
+func (p *tcpProxy) accepting() bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.accept
+}
+
+func init() {
+ go proxy.serve()
+}
+
+func TestBroker_Durability_Base(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ // expect 2 connections
+ proxy.waitConn(2)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Durability_Consume(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ done[id] = true
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ st, err := b.Stat(pipe)
+ if err != nil {
+ continue
+ }
+
+ // wait till pipeline is empty
+ if st.Queue+st.Active == 0 {
+ return
+ }
+ }
+}
+
+func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // reoccuring
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ time.Sleep(3 * time.Second)
+ proxy.waitConn(1)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NotEqual(t, "0", jid)
+
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume2(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ st, serr := b.Stat(pipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(1), st.Queue+st.Active)
+
+ proxy.reset(true)
+
+ // auto-reconnect
+ _, serr = b.Stat(pipe)
+ assert.NoError(t, serr)
+
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ done[id] = true
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ st, err := b.Stat(pipe)
+ if err != nil {
+ continue
+ }
+
+ // wait till pipeline is empty
+ if st.Queue+st.Active == 0 {
+ return
+ }
+ }
+}
+
+func TestBroker_Durability_Consume3(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(2)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ st, serr := b.Stat(pipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(1), st.Queue+st.Active)
+
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ done[id] = true
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ st, err := b.Stat(pipe)
+ if err != nil {
+ continue
+ }
+
+ // wait till pipeline is empty
+ if st.Queue+st.Active == 0 {
+ return
+ }
+ }
+}
+
+func TestBroker_Durability_Consume4(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(2)
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "kill",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ st, serr := b.Stat(pipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(3), st.Queue+st.Active)
+
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ done[id] = true
+ if j.Payload == "kill" {
+ proxy.reset(true)
+ }
+
+ return nil
+ }
+
+ for {
+ st, err := b.Stat(pipe)
+ if err != nil {
+ continue
+ }
+
+ // wait till pipeline is empty
+ if st.Queue+st.Active == 0 {
+ return
+ }
+ }
+}
+
+func TestBroker_Durability_StopDead(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+
+ <-ready
+
+ proxy.waitConn(2).reset(false)
+
+ b.Stop()
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go
new file mode 100644
index 00000000..fd9c8c3c
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/job.go
@@ -0,0 +1,24 @@
+package beanstalk
+
+import (
+ "bytes"
+ "encoding/gob"
+ "github.com/spiral/jobs/v2"
+)
+
+func pack(j *jobs.Job) ([]byte, error) {
+ b := new(bytes.Buffer)
+ err := gob.NewEncoder(b).Encode(j)
+ if err != nil {
+ return nil, err
+ }
+
+ return b.Bytes(), nil
+}
+
+func unpack(data []byte) (*jobs.Job, error) {
+ j := &jobs.Job{}
+ err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j)
+
+ return j, err
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/sock.bean
diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go
new file mode 100644
index 00000000..14a55859
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/stat_test.go
@@ -0,0 +1,66 @@
+package beanstalk
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Stat(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ // beanstalk reserves job right after push
+ time.Sleep(time.Millisecond * 100)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), stat.Queue)
+ assert.Equal(t, int64(1), stat.Active)
+
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ stat, err = b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), stat.Queue)
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go
new file mode 100644
index 00000000..9d7ad117
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/tube.go
@@ -0,0 +1,250 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/jobs/v2"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type tube struct {
+ active int32
+ pipe *jobs.Pipeline
+ mut sync.Mutex
+ tube *beanstalk.Tube
+ tubeSet *beanstalk.TubeSet
+ reserve time.Duration
+
+ // tube events
+ lsn func(event int, ctx interface{})
+
+ // stop channel
+ wait chan interface{}
+
+ // active operations
+ muw sync.RWMutex
+ wg sync.WaitGroup
+
+ // exec handlers
+ execPool chan jobs.Handler
+ errHandler jobs.ErrorHandler
+}
+
+type entry struct {
+ id uint64
+ data []byte
+}
+
+func (e *entry) String() string {
+ return fmt.Sprintf("%v", e.id)
+}
+
+// create new tube consumer and producer
+func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) {
+ if pipe.String("tube", "") == "" {
+ return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline")
+ }
+
+ return &tube{
+ pipe: pipe,
+ tube: &beanstalk.Tube{Name: pipe.String("tube", "")},
+ tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")),
+ reserve: pipe.Duration("reserve", time.Second),
+ lsn: lsn,
+ }, nil
+}
+
+// run consumers
+func (t *tube) serve(connector connFactory) {
+ // tube specific consume connection
+ cn, err := connector.newConn()
+ if err != nil {
+ t.report(err)
+ return
+ }
+ defer cn.Close()
+
+ t.wait = make(chan interface{})
+ atomic.StoreInt32(&t.active, 1)
+
+ for {
+ e, err := t.consume(cn)
+ if err != nil {
+ if isConnError(err) {
+ t.report(err)
+ }
+ continue
+ }
+
+ if e == nil {
+ return
+ }
+
+ h := <-t.execPool
+ go func(h jobs.Handler, e *entry) {
+ err := t.do(cn, h, e)
+ t.execPool <- h
+ t.wg.Done()
+ t.report(err)
+ }(h, e)
+ }
+}
+
+// fetch consume
+func (t *tube) consume(cn *conn) (*entry, error) {
+ t.muw.Lock()
+ defer t.muw.Unlock()
+
+ select {
+ case <-t.wait:
+ return nil, nil
+ default:
+ conn, err := cn.acquire(false)
+ if err != nil {
+ return nil, err
+ }
+
+ t.tubeSet.Conn = conn
+
+ id, data, err := t.tubeSet.Reserve(t.reserve)
+ cn.release(err)
+
+ if err != nil {
+ return nil, err
+ }
+
+ t.wg.Add(1)
+ return &entry{id: id, data: data}, nil
+ }
+}
+
+// do data
+func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error {
+ j, err := unpack(e.data)
+ if err != nil {
+ return err
+ }
+
+ err = h(e.String(), j)
+
+ // mandatory acquisition
+ conn, connErr := cn.acquire(true)
+ if connErr != nil {
+ // possible if server is dead
+ return connErr
+ }
+
+ if err == nil {
+ return cn.release(conn.Delete(e.id))
+ }
+
+ stat, statErr := conn.StatsJob(e.id)
+ if statErr != nil {
+ return cn.release(statErr)
+ }
+
+ t.errHandler(e.String(), j, err)
+
+ reserves, ok := strconv.Atoi(stat["reserves"])
+ if ok != nil || !j.Options.CanRetry(reserves-1) {
+ return cn.release(conn.Bury(e.id, 0))
+ }
+
+ return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration()))
+}
+
+// stop tube consuming
+func (t *tube) stop() {
+ if atomic.LoadInt32(&t.active) == 0 {
+ return
+ }
+
+ atomic.StoreInt32(&t.active, 0)
+
+ close(t.wait)
+
+ t.muw.Lock()
+ t.wg.Wait()
+ t.muw.Unlock()
+}
+
+// put data into pool or return error (no wait), this method will try to reattempt operation if
+// dead conn found.
+func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
+ id, err = t.doPut(cn, attempt, data, delay, rrt)
+ if err != nil && isConnError(err) {
+ return t.doPut(cn, attempt, data, delay, rrt)
+ }
+
+ return id, err
+}
+
+// perform put operation
+func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
+ conn, err := cn.acquire(false)
+ if err != nil {
+ return "", err
+ }
+
+ var bid uint64
+
+ t.mut.Lock()
+ t.tube.Conn = conn
+ bid, err = t.tube.Put(data, 0, delay, rrt)
+ t.mut.Unlock()
+
+ return strconv.FormatUint(bid, 10), cn.release(err)
+}
+
+// return tube stats (retries)
+func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) {
+ stat, err = t.doStat(cn)
+ if err != nil && isConnError(err) {
+ return t.doStat(cn)
+ }
+
+ return stat, err
+}
+
+// return tube stats
+func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) {
+ conn, err := cn.acquire(false)
+ if err != nil {
+ return nil, err
+ }
+
+ t.mut.Lock()
+ t.tube.Conn = conn
+ values, err := t.tube.Stats()
+ t.mut.Unlock()
+
+ if err != nil {
+ return nil, cn.release(err)
+ }
+
+ stat = &jobs.Stat{InternalName: t.tube.Name}
+
+ if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil {
+ stat.Queue = int64(v)
+ }
+
+ if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil {
+ stat.Active = int64(v)
+ }
+
+ if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil {
+ stat.Delayed = int64(v)
+ }
+
+ return stat, cn.release(nil)
+}
+
+// report tube specific error
+func (t *tube) report(err error) {
+ if err != nil {
+ t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err})
+ }
+}
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go
new file mode 100644
index 00000000..b6a646f4
--- /dev/null
+++ b/plugins/jobs/oooold/broker/beanstalk/tube_test.go
@@ -0,0 +1,18 @@
+package beanstalk
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestTube_CantServe(t *testing.T) {
+ var gctx interface{}
+ tube := &tube{
+ lsn: func(event int, ctx interface{}) {
+ gctx = ctx
+ },
+ }
+
+ tube.serve(&Config{Addr: "broken"})
+ assert.Error(t, gctx.(error))
+}