summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/beanstalk
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/beanstalk')
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/broker.go185
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/broker_test.go276
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/config.go50
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/config_test.go47
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/conn.go180
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/constants.go6
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/consume_test.go242
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/durability_test.go575
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/job.go24
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/sock.bean0
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/stat_test.go66
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube.go250
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube_test.go18
13 files changed, 0 insertions, 1919 deletions
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go
deleted file mode 100644
index dc3ea518..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/broker.go
+++ /dev/null
@@ -1,185 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker run consume using Broker service.
-type Broker struct {
- cfg *Config
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- conn *conn
- tubes map[*jobs.Pipeline]*tube
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures broker.
-func (b *Broker) Init(cfg *Config) (bool, error) {
- b.cfg = cfg
- b.tubes = make(map[*jobs.Pipeline]*tube)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.tubes[pipe]; ok {
- return fmt.Errorf("tube `%s` has already been registered", pipe.Name())
- }
-
- t, err := newTube(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.tubes[pipe] = t
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- if b.conn, err = b.cfg.newConn(); err != nil {
- return err
- }
- defer b.conn.Close()
-
- for _, t := range b.tubes {
- tt := t
- if tt.execPool != nil {
- go tt.serve(b.cfg)
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, t := range b.tubes {
- t.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- t, ok := b.tubes[pipe]
- if !ok {
- return fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- t.stop()
-
- t.execPool = execPool
- t.errHandler = errHandler
-
- if b.conn != nil {
- tt := t
- if tt.execPool != nil {
- go tt.serve(connFactory(b.cfg))
- }
- }
-
- return nil
-}
-
-// Push data into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- t := b.tube(pipe)
- if t == nil {
- return "", fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- data, err := pack(j)
- if err != nil {
- return "", err
- }
-
- return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration())
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- t := b.tube(pipe)
- if t == nil {
- return nil, fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- return t.stat(b.conn)
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) tube(pipe *jobs.Pipeline) *tube {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- t, ok := b.tubes[pipe]
- if !ok {
- return nil
- }
-
- return t
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go
deleted file mode 100644
index cd2132af..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/broker_test.go
+++ /dev/null
@@ -1,276 +0,0 @@
-package beanstalk
-
-import (
- "github.com/beanstalkd/go-beanstalk"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "beanstalk",
- "name": "default",
- "tube": "test",
- }
-
- cfg = &Config{
- Addr: "tcp://localhost:11300",
- }
-)
-
-func init() {
- conn, err := beanstalk.Dial("tcp", "localhost:11300")
- if err != nil {
- panic(err)
- }
- defer conn.Close()
-
- t := beanstalk.Tube{Name: "testTube", Conn: conn}
-
- for {
- id, _, err := t.PeekReady()
- if id == 0 || err != nil {
- break
- }
-
- if err := conn.Delete(id); err != nil {
- panic(err)
- }
- }
-}
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Invalid(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "beanstalk",
- "name": "default",
- }))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Error(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(&Config{
- Addr: "tcp://localhost:11399",
- })
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go
deleted file mode 100644
index 3e48a2d7..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/config.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/roadrunner/service"
- "strings"
- "time"
-)
-
-// Config defines beanstalk broker configuration.
-type Config struct {
- // Addr of beanstalk server.
- Addr string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Addr == "" {
- return fmt.Errorf("beanstalk address is missing")
- }
-
- return nil
-}
-
-// TimeoutDuration returns number of seconds allowed to allocate the connection.
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
-
-// size creates new rpc socket Listener.
-func (c *Config) newConn() (*conn, error) {
- dsn := strings.Split(c.Addr, "://")
- if len(dsn) != 2 {
- return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)")
- }
-
- return newConn(dsn[0], dsn[1], c.TimeoutDuration())
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go
deleted file mode 100644
index 4ba08a04..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/config_test.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package beanstalk
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func TestConfig_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func TestConfig_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{"addr":""}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func TestConfig_Hydrate_Error3(t *testing.T) {
- cfg := &mockCfg{`{"addr":"tcp"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- _, err := c.newConn()
- assert.Error(t, err)
-}
-
-func TestConfig_Hydrate_Error4(t *testing.T) {
- cfg := &mockCfg{`{"addr":"unix://sock.bean"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- _, err := c.newConn()
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go
deleted file mode 100644
index 7aba6bbb..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/conn.go
+++ /dev/null
@@ -1,180 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/beanstalkd/go-beanstalk"
- "github.com/cenkalti/backoff/v4"
- "strings"
- "sync"
- "time"
-)
-
-var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
-
-// creates new connections
-type connFactory interface {
- newConn() (*conn, error)
-}
-
-// conn protects allocation for one connection between
-// threads and provides reconnecting capabilities.
-type conn struct {
- tout time.Duration
- conn *beanstalk.Conn
- alive bool
- free chan interface{}
- dead chan interface{}
- stop chan interface{}
- lock *sync.Cond
-}
-
-// creates new beanstalk connection and reconnect watcher.
-func newConn(network, addr string, tout time.Duration) (cn *conn, err error) {
- cn = &conn{
- tout: tout,
- alive: true,
- free: make(chan interface{}, 1),
- dead: make(chan interface{}, 1),
- stop: make(chan interface{}),
- lock: sync.NewCond(&sync.Mutex{}),
- }
-
- cn.conn, err = beanstalk.Dial(network, addr)
- if err != nil {
- return nil, err
- }
-
- go cn.watch(network, addr)
-
- return cn, nil
-}
-
-// reset the connection and reconnect watcher.
-func (cn *conn) Close() error {
- cn.lock.L.Lock()
- defer cn.lock.L.Unlock()
-
- close(cn.stop)
- for cn.alive {
- cn.lock.Wait()
- }
-
- return nil
-}
-
-// acquire connection instance or return error in case of timeout. When mandratory set to true
-// timeout won't be applied.
-func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) {
- // do not apply timeout on mandatory connections
- if mandatory {
- select {
- case <-cn.stop:
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- return cn.conn, nil
- }
- }
-
- select {
- case <-cn.stop:
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- return cn.conn, nil
- default:
- // *2 to handle commands called right after the connection reset
- tout := time.NewTimer(cn.tout * 2)
- select {
- case <-cn.stop:
- tout.Stop()
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- tout.Stop()
- return cn.conn, nil
- case <-tout.C:
- return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout)
- }
- }
-}
-
-// release acquired connection.
-func (cn *conn) release(err error) error {
- if isConnError(err) {
- // reconnect is required
- cn.dead <- err
- } else {
- cn.free <- nil
- }
-
- return err
-}
-
-// watch and reconnect if dead
-func (cn *conn) watch(network, addr string) {
- cn.free <- nil
- t := time.NewTicker(WatchThrottleLimit)
- defer t.Stop()
- for {
- select {
- case <-cn.dead:
- // simple throttle limiter
- <-t.C
- // try to reconnect
- // TODO add logging here
- expb := backoff.NewExponentialBackOff()
- expb.MaxInterval = cn.tout
-
- reconnect := func() error {
- conn, err := beanstalk.Dial(network, addr)
- if err != nil {
- fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error()))
- return err
- }
-
- // TODO ADD LOGGING
- fmt.Println("------beanstalk successfully redialed------")
-
- cn.conn = conn
- cn.free <- nil
- return nil
- }
-
- err := backoff.Retry(reconnect, expb)
- if err != nil {
- fmt.Println(fmt.Sprintf("redial failed: %s", err.Error()))
- cn.dead <- nil
- }
-
- case <-cn.stop:
- cn.lock.L.Lock()
- select {
- case <-cn.dead:
- case <-cn.free:
- }
-
- // stop underlying connection
- cn.conn.Close()
- cn.alive = false
- cn.lock.Signal()
-
- cn.lock.L.Unlock()
-
- return
- }
- }
-}
-
-// isConnError indicates that error is related to dead socket.
-func isConnError(err error) bool {
- if err == nil {
- return false
- }
-
- for _, errStr := range connErrors {
- // golang...
- if strings.Contains(err.Error(), errStr) {
- return true
- }
- }
-
- return false
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go
deleted file mode 100644
index 84be305e..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/constants.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package beanstalk
-
-import "time"
-
-// WatchThrottleLimit is used to limit reconnection occurrence in watch function
-const WatchThrottleLimit = time.Second \ No newline at end of file
diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go
deleted file mode 100644
index b16866ae..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/consume_test.go
+++ /dev/null
@@ -1,242 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed >= time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go
deleted file mode 100644
index 499a5206..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/durability_test.go
+++ /dev/null
@@ -1,575 +0,0 @@
-package beanstalk
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Addr: "tcp://localhost:11301",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:11301",
- upstream: "localhost:11300",
- accept: true,
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- // expect 2 connections
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // reoccuring
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NotEqual(t, "0", jid)
-
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- proxy.reset(true)
-
- // auto-reconnect
- _, serr = b.Stat(pipe)
- assert.NoError(t, serr)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2)
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(3), st.Queue+st.Active)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- if j.Payload == "kill" {
- proxy.reset(true)
- }
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go
deleted file mode 100644
index fd9c8c3c..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/job.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "encoding/gob"
- "github.com/spiral/jobs/v2"
-)
-
-func pack(j *jobs.Job) ([]byte, error) {
- b := new(bytes.Buffer)
- err := gob.NewEncoder(b).Encode(j)
- if err != nil {
- return nil, err
- }
-
- return b.Bytes(), nil
-}
-
-func unpack(data []byte) (*jobs.Job, error) {
- j := &jobs.Job{}
- err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j)
-
- return j, err
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean
deleted file mode 100644
index e69de29b..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/sock.bean
+++ /dev/null
diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go
deleted file mode 100644
index 14a55859..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/stat_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package beanstalk
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- // beanstalk reserves job right after push
- time.Sleep(time.Millisecond * 100)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(1), stat.Active)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go
deleted file mode 100644
index 9d7ad117..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/tube.go
+++ /dev/null
@@ -1,250 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/beanstalkd/go-beanstalk"
- "github.com/spiral/jobs/v2"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type tube struct {
- active int32
- pipe *jobs.Pipeline
- mut sync.Mutex
- tube *beanstalk.Tube
- tubeSet *beanstalk.TubeSet
- reserve time.Duration
-
- // tube events
- lsn func(event int, ctx interface{})
-
- // stop channel
- wait chan interface{}
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-type entry struct {
- id uint64
- data []byte
-}
-
-func (e *entry) String() string {
- return fmt.Sprintf("%v", e.id)
-}
-
-// create new tube consumer and producer
-func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) {
- if pipe.String("tube", "") == "" {
- return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline")
- }
-
- return &tube{
- pipe: pipe,
- tube: &beanstalk.Tube{Name: pipe.String("tube", "")},
- tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")),
- reserve: pipe.Duration("reserve", time.Second),
- lsn: lsn,
- }, nil
-}
-
-// run consumers
-func (t *tube) serve(connector connFactory) {
- // tube specific consume connection
- cn, err := connector.newConn()
- if err != nil {
- t.report(err)
- return
- }
- defer cn.Close()
-
- t.wait = make(chan interface{})
- atomic.StoreInt32(&t.active, 1)
-
- for {
- e, err := t.consume(cn)
- if err != nil {
- if isConnError(err) {
- t.report(err)
- }
- continue
- }
-
- if e == nil {
- return
- }
-
- h := <-t.execPool
- go func(h jobs.Handler, e *entry) {
- err := t.do(cn, h, e)
- t.execPool <- h
- t.wg.Done()
- t.report(err)
- }(h, e)
- }
-}
-
-// fetch consume
-func (t *tube) consume(cn *conn) (*entry, error) {
- t.muw.Lock()
- defer t.muw.Unlock()
-
- select {
- case <-t.wait:
- return nil, nil
- default:
- conn, err := cn.acquire(false)
- if err != nil {
- return nil, err
- }
-
- t.tubeSet.Conn = conn
-
- id, data, err := t.tubeSet.Reserve(t.reserve)
- cn.release(err)
-
- if err != nil {
- return nil, err
- }
-
- t.wg.Add(1)
- return &entry{id: id, data: data}, nil
- }
-}
-
-// do data
-func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error {
- j, err := unpack(e.data)
- if err != nil {
- return err
- }
-
- err = h(e.String(), j)
-
- // mandatory acquisition
- conn, connErr := cn.acquire(true)
- if connErr != nil {
- // possible if server is dead
- return connErr
- }
-
- if err == nil {
- return cn.release(conn.Delete(e.id))
- }
-
- stat, statErr := conn.StatsJob(e.id)
- if statErr != nil {
- return cn.release(statErr)
- }
-
- t.errHandler(e.String(), j, err)
-
- reserves, ok := strconv.Atoi(stat["reserves"])
- if ok != nil || !j.Options.CanRetry(reserves-1) {
- return cn.release(conn.Bury(e.id, 0))
- }
-
- return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration()))
-}
-
-// stop tube consuming
-func (t *tube) stop() {
- if atomic.LoadInt32(&t.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&t.active, 0)
-
- close(t.wait)
-
- t.muw.Lock()
- t.wg.Wait()
- t.muw.Unlock()
-}
-
-// put data into pool or return error (no wait), this method will try to reattempt operation if
-// dead conn found.
-func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
- id, err = t.doPut(cn, attempt, data, delay, rrt)
- if err != nil && isConnError(err) {
- return t.doPut(cn, attempt, data, delay, rrt)
- }
-
- return id, err
-}
-
-// perform put operation
-func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
- conn, err := cn.acquire(false)
- if err != nil {
- return "", err
- }
-
- var bid uint64
-
- t.mut.Lock()
- t.tube.Conn = conn
- bid, err = t.tube.Put(data, 0, delay, rrt)
- t.mut.Unlock()
-
- return strconv.FormatUint(bid, 10), cn.release(err)
-}
-
-// return tube stats (retries)
-func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) {
- stat, err = t.doStat(cn)
- if err != nil && isConnError(err) {
- return t.doStat(cn)
- }
-
- return stat, err
-}
-
-// return tube stats
-func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) {
- conn, err := cn.acquire(false)
- if err != nil {
- return nil, err
- }
-
- t.mut.Lock()
- t.tube.Conn = conn
- values, err := t.tube.Stats()
- t.mut.Unlock()
-
- if err != nil {
- return nil, cn.release(err)
- }
-
- stat = &jobs.Stat{InternalName: t.tube.Name}
-
- if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil {
- stat.Queue = int64(v)
- }
-
- if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil {
- stat.Active = int64(v)
- }
-
- if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil {
- stat.Delayed = int64(v)
- }
-
- return stat, cn.release(nil)
-}
-
-// report tube specific error
-func (t *tube) report(err error) {
- if err != nil {
- t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go
deleted file mode 100644
index b6a646f4..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/tube_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package beanstalk
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestTube_CantServe(t *testing.T) {
- var gctx interface{}
- tube := &tube{
- lsn: func(event int, ctx interface{}) {
- gctx = ctx
- },
- }
-
- tube.serve(&Config{Addr: "broken"})
- assert.Error(t, gctx.(error))
-}