diff options
Diffstat (limited to 'plugins/jobs/oooold/broker/beanstalk')
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/broker.go | 185 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/broker_test.go | 276 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/config.go | 50 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/config_test.go | 47 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/conn.go | 180 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/constants.go | 6 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/consume_test.go | 242 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/durability_test.go | 575 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/job.go | 24 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/sock.bean | 0 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/stat_test.go | 66 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/tube.go | 250 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/tube_test.go | 18 |
13 files changed, 0 insertions, 1919 deletions
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go deleted file mode 100644 index dc3ea518..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/broker.go +++ /dev/null @@ -1,185 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker run consume using Broker service. -type Broker struct { - cfg *Config - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - conn *conn - tubes map[*jobs.Pipeline]*tube -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures broker. -func (b *Broker) Init(cfg *Config) (bool, error) { - b.cfg = cfg - b.tubes = make(map[*jobs.Pipeline]*tube) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.tubes[pipe]; ok { - return fmt.Errorf("tube `%s` has already been registered", pipe.Name()) - } - - t, err := newTube(pipe, b.throw) - if err != nil { - return err - } - - b.tubes[pipe] = t - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - if b.conn, err = b.cfg.newConn(); err != nil { - return err - } - defer b.conn.Close() - - for _, t := range b.tubes { - tt := t - if tt.execPool != nil { - go tt.serve(b.cfg) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, t := range b.tubes { - t.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - t, ok := b.tubes[pipe] - if !ok { - return fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - t.stop() - - t.execPool = execPool - t.errHandler = errHandler - - if b.conn != nil { - tt := t - if tt.execPool != nil { - go tt.serve(connFactory(b.cfg)) - } - } - - return nil -} - -// Push data into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - t := b.tube(pipe) - if t == nil { - return "", fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - data, err := pack(j) - if err != nil { - return "", err - } - - return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration()) -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - t := b.tube(pipe) - if t == nil { - return nil, fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - return t.stat(b.conn) -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) tube(pipe *jobs.Pipeline) *tube { - b.mu.Lock() - defer b.mu.Unlock() - - t, ok := b.tubes[pipe] - if !ok { - return nil - } - - return t -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go deleted file mode 100644 index cd2132af..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/broker_test.go +++ /dev/null @@ -1,276 +0,0 @@ -package beanstalk - -import ( - "github.com/beanstalkd/go-beanstalk" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "beanstalk", - "name": "default", - "tube": "test", - } - - cfg = &Config{ - Addr: "tcp://localhost:11300", - } -) - -func init() { - conn, err := beanstalk.Dial("tcp", "localhost:11300") - if err != nil { - panic(err) - } - defer conn.Close() - - t := beanstalk.Tube{Name: "testTube", Conn: conn} - - for { - id, _, err := t.PeekReady() - if id == 0 || err != nil { - break - } - - if err := conn.Delete(id); err != nil { - panic(err) - } - } -} - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Register_Invalid(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "beanstalk", - "name": "default", - })) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Error(t *testing.T) { - b := &Broker{} - _, err := b.Init(&Config{ - Addr: "tcp://localhost:11399", - }) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Serve()) -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go deleted file mode 100644 index 3e48a2d7..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/config.go +++ /dev/null @@ -1,50 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/roadrunner/service" - "strings" - "time" -) - -// Config defines beanstalk broker configuration. -type Config struct { - // Addr of beanstalk server. - Addr string - - // Timeout to allocate the connection. Default 10 seconds. - Timeout int -} - -// Hydrate config values. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - if c.Addr == "" { - return fmt.Errorf("beanstalk address is missing") - } - - return nil -} - -// TimeoutDuration returns number of seconds allowed to allocate the connection. -func (c *Config) TimeoutDuration() time.Duration { - timeout := c.Timeout - if timeout == 0 { - timeout = 10 - } - - return time.Duration(timeout) * time.Second -} - -// size creates new rpc socket Listener. -func (c *Config) newConn() (*conn, error) { - dsn := strings.Split(c.Addr, "://") - if len(dsn) != 2 { - return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)") - } - - return newConn(dsn[0], dsn[1], c.TimeoutDuration()) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go deleted file mode 100644 index 4ba08a04..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/config_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package beanstalk - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func TestConfig_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{`{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func TestConfig_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{"addr":""}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func TestConfig_Hydrate_Error3(t *testing.T) { - cfg := &mockCfg{`{"addr":"tcp"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - _, err := c.newConn() - assert.Error(t, err) -} - -func TestConfig_Hydrate_Error4(t *testing.T) { - cfg := &mockCfg{`{"addr":"unix://sock.bean"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - _, err := c.newConn() - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go deleted file mode 100644 index 7aba6bbb..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/conn.go +++ /dev/null @@ -1,180 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/beanstalkd/go-beanstalk" - "github.com/cenkalti/backoff/v4" - "strings" - "sync" - "time" -) - -var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} - -// creates new connections -type connFactory interface { - newConn() (*conn, error) -} - -// conn protects allocation for one connection between -// threads and provides reconnecting capabilities. -type conn struct { - tout time.Duration - conn *beanstalk.Conn - alive bool - free chan interface{} - dead chan interface{} - stop chan interface{} - lock *sync.Cond -} - -// creates new beanstalk connection and reconnect watcher. -func newConn(network, addr string, tout time.Duration) (cn *conn, err error) { - cn = &conn{ - tout: tout, - alive: true, - free: make(chan interface{}, 1), - dead: make(chan interface{}, 1), - stop: make(chan interface{}), - lock: sync.NewCond(&sync.Mutex{}), - } - - cn.conn, err = beanstalk.Dial(network, addr) - if err != nil { - return nil, err - } - - go cn.watch(network, addr) - - return cn, nil -} - -// reset the connection and reconnect watcher. -func (cn *conn) Close() error { - cn.lock.L.Lock() - defer cn.lock.L.Unlock() - - close(cn.stop) - for cn.alive { - cn.lock.Wait() - } - - return nil -} - -// acquire connection instance or return error in case of timeout. When mandratory set to true -// timeout won't be applied. -func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) { - // do not apply timeout on mandatory connections - if mandatory { - select { - case <-cn.stop: - return nil, fmt.Errorf("connection closed") - case <-cn.free: - return cn.conn, nil - } - } - - select { - case <-cn.stop: - return nil, fmt.Errorf("connection closed") - case <-cn.free: - return cn.conn, nil - default: - // *2 to handle commands called right after the connection reset - tout := time.NewTimer(cn.tout * 2) - select { - case <-cn.stop: - tout.Stop() - return nil, fmt.Errorf("connection closed") - case <-cn.free: - tout.Stop() - return cn.conn, nil - case <-tout.C: - return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout) - } - } -} - -// release acquired connection. -func (cn *conn) release(err error) error { - if isConnError(err) { - // reconnect is required - cn.dead <- err - } else { - cn.free <- nil - } - - return err -} - -// watch and reconnect if dead -func (cn *conn) watch(network, addr string) { - cn.free <- nil - t := time.NewTicker(WatchThrottleLimit) - defer t.Stop() - for { - select { - case <-cn.dead: - // simple throttle limiter - <-t.C - // try to reconnect - // TODO add logging here - expb := backoff.NewExponentialBackOff() - expb.MaxInterval = cn.tout - - reconnect := func() error { - conn, err := beanstalk.Dial(network, addr) - if err != nil { - fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error())) - return err - } - - // TODO ADD LOGGING - fmt.Println("------beanstalk successfully redialed------") - - cn.conn = conn - cn.free <- nil - return nil - } - - err := backoff.Retry(reconnect, expb) - if err != nil { - fmt.Println(fmt.Sprintf("redial failed: %s", err.Error())) - cn.dead <- nil - } - - case <-cn.stop: - cn.lock.L.Lock() - select { - case <-cn.dead: - case <-cn.free: - } - - // stop underlying connection - cn.conn.Close() - cn.alive = false - cn.lock.Signal() - - cn.lock.L.Unlock() - - return - } - } -} - -// isConnError indicates that error is related to dead socket. -func isConnError(err error) bool { - if err == nil { - return false - } - - for _, errStr := range connErrors { - // golang... - if strings.Contains(err.Error(), errStr) { - return true - } - } - - return false -} diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go deleted file mode 100644 index 84be305e..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/constants.go +++ /dev/null @@ -1,6 +0,0 @@ -package beanstalk - -import "time" - -// WatchThrottleLimit is used to limit reconnection occurrence in watch function -const WatchThrottleLimit = time.Second
\ No newline at end of file diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go deleted file mode 100644 index b16866ae..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/consume_test.go +++ /dev/null @@ -1,242 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed >= time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go deleted file mode 100644 index 499a5206..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/durability_test.go +++ /dev/null @@ -1,575 +0,0 @@ -package beanstalk - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Addr: "tcp://localhost:11301", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:11301", - upstream: "localhost:11300", - accept: true, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - // expect 2 connections - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // reoccuring - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NotEqual(t, "0", jid) - - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - proxy.reset(true) - - // auto-reconnect - _, serr = b.Stat(pipe) - assert.NoError(t, serr) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2) - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(3), st.Queue+st.Active) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - if j.Payload == "kill" { - proxy.reset(true) - } - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(2).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go deleted file mode 100644 index fd9c8c3c..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/job.go +++ /dev/null @@ -1,24 +0,0 @@ -package beanstalk - -import ( - "bytes" - "encoding/gob" - "github.com/spiral/jobs/v2" -) - -func pack(j *jobs.Job) ([]byte, error) { - b := new(bytes.Buffer) - err := gob.NewEncoder(b).Encode(j) - if err != nil { - return nil, err - } - - return b.Bytes(), nil -} - -func unpack(data []byte) (*jobs.Job, error) { - j := &jobs.Job{} - err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j) - - return j, err -} diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean deleted file mode 100644 index e69de29b..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/sock.bean +++ /dev/null diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go deleted file mode 100644 index 14a55859..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/stat_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package beanstalk - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - // beanstalk reserves job right after push - time.Sleep(time.Millisecond * 100) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(1), stat.Active) - - close(waitJob) - return nil - } - - <-waitJob - - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go deleted file mode 100644 index 9d7ad117..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/tube.go +++ /dev/null @@ -1,250 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/beanstalkd/go-beanstalk" - "github.com/spiral/jobs/v2" - "strconv" - "sync" - "sync/atomic" - "time" -) - -type tube struct { - active int32 - pipe *jobs.Pipeline - mut sync.Mutex - tube *beanstalk.Tube - tubeSet *beanstalk.TubeSet - reserve time.Duration - - // tube events - lsn func(event int, ctx interface{}) - - // stop channel - wait chan interface{} - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -type entry struct { - id uint64 - data []byte -} - -func (e *entry) String() string { - return fmt.Sprintf("%v", e.id) -} - -// create new tube consumer and producer -func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) { - if pipe.String("tube", "") == "" { - return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline") - } - - return &tube{ - pipe: pipe, - tube: &beanstalk.Tube{Name: pipe.String("tube", "")}, - tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")), - reserve: pipe.Duration("reserve", time.Second), - lsn: lsn, - }, nil -} - -// run consumers -func (t *tube) serve(connector connFactory) { - // tube specific consume connection - cn, err := connector.newConn() - if err != nil { - t.report(err) - return - } - defer cn.Close() - - t.wait = make(chan interface{}) - atomic.StoreInt32(&t.active, 1) - - for { - e, err := t.consume(cn) - if err != nil { - if isConnError(err) { - t.report(err) - } - continue - } - - if e == nil { - return - } - - h := <-t.execPool - go func(h jobs.Handler, e *entry) { - err := t.do(cn, h, e) - t.execPool <- h - t.wg.Done() - t.report(err) - }(h, e) - } -} - -// fetch consume -func (t *tube) consume(cn *conn) (*entry, error) { - t.muw.Lock() - defer t.muw.Unlock() - - select { - case <-t.wait: - return nil, nil - default: - conn, err := cn.acquire(false) - if err != nil { - return nil, err - } - - t.tubeSet.Conn = conn - - id, data, err := t.tubeSet.Reserve(t.reserve) - cn.release(err) - - if err != nil { - return nil, err - } - - t.wg.Add(1) - return &entry{id: id, data: data}, nil - } -} - -// do data -func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error { - j, err := unpack(e.data) - if err != nil { - return err - } - - err = h(e.String(), j) - - // mandatory acquisition - conn, connErr := cn.acquire(true) - if connErr != nil { - // possible if server is dead - return connErr - } - - if err == nil { - return cn.release(conn.Delete(e.id)) - } - - stat, statErr := conn.StatsJob(e.id) - if statErr != nil { - return cn.release(statErr) - } - - t.errHandler(e.String(), j, err) - - reserves, ok := strconv.Atoi(stat["reserves"]) - if ok != nil || !j.Options.CanRetry(reserves-1) { - return cn.release(conn.Bury(e.id, 0)) - } - - return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration())) -} - -// stop tube consuming -func (t *tube) stop() { - if atomic.LoadInt32(&t.active) == 0 { - return - } - - atomic.StoreInt32(&t.active, 0) - - close(t.wait) - - t.muw.Lock() - t.wg.Wait() - t.muw.Unlock() -} - -// put data into pool or return error (no wait), this method will try to reattempt operation if -// dead conn found. -func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { - id, err = t.doPut(cn, attempt, data, delay, rrt) - if err != nil && isConnError(err) { - return t.doPut(cn, attempt, data, delay, rrt) - } - - return id, err -} - -// perform put operation -func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { - conn, err := cn.acquire(false) - if err != nil { - return "", err - } - - var bid uint64 - - t.mut.Lock() - t.tube.Conn = conn - bid, err = t.tube.Put(data, 0, delay, rrt) - t.mut.Unlock() - - return strconv.FormatUint(bid, 10), cn.release(err) -} - -// return tube stats (retries) -func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) { - stat, err = t.doStat(cn) - if err != nil && isConnError(err) { - return t.doStat(cn) - } - - return stat, err -} - -// return tube stats -func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) { - conn, err := cn.acquire(false) - if err != nil { - return nil, err - } - - t.mut.Lock() - t.tube.Conn = conn - values, err := t.tube.Stats() - t.mut.Unlock() - - if err != nil { - return nil, cn.release(err) - } - - stat = &jobs.Stat{InternalName: t.tube.Name} - - if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil { - stat.Queue = int64(v) - } - - if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil { - stat.Active = int64(v) - } - - if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil { - stat.Delayed = int64(v) - } - - return stat, cn.release(nil) -} - -// report tube specific error -func (t *tube) report(err error) { - if err != nil { - t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go deleted file mode 100644 index b6a646f4..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/tube_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package beanstalk - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func TestTube_CantServe(t *testing.T) { - var gctx interface{} - tube := &tube{ - lsn: func(event int, ctx interface{}) { - gctx = ctx - }, - } - - tube.serve(&Config{Addr: "broken"}) - assert.Error(t, gctx.(error)) -} |