diff options
author | Valery Piashchynski <[email protected]> | 2021-06-16 12:56:02 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-16 12:56:02 +0300 |
commit | cee4bc46097506d6e892b6af194751434700621a (patch) | |
tree | e542d1b2f963c2aa0e304703c82ff4f04203b169 /plugins/jobs/oooold/broker/beanstalk | |
parent | d4c92e48bada7593b6fbec612a742c599de6e736 (diff) |
- Update jobs sources
- Update Arch diagramm
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker/beanstalk')
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/broker.go | 185 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/broker_test.go | 276 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/config.go | 50 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/config_test.go | 47 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/conn.go | 180 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/constants.go | 6 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/consume_test.go | 242 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/durability_test.go | 575 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/job.go | 24 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/sock.bean | 0 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/stat_test.go | 66 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/tube.go | 250 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/tube_test.go | 18 |
13 files changed, 1919 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go new file mode 100644 index 00000000..dc3ea518 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/broker.go @@ -0,0 +1,185 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker run consume using Broker service. +type Broker struct { + cfg *Config + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + conn *conn + tubes map[*jobs.Pipeline]*tube +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures broker. +func (b *Broker) Init(cfg *Config) (bool, error) { + b.cfg = cfg + b.tubes = make(map[*jobs.Pipeline]*tube) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.tubes[pipe]; ok { + return fmt.Errorf("tube `%s` has already been registered", pipe.Name()) + } + + t, err := newTube(pipe, b.throw) + if err != nil { + return err + } + + b.tubes[pipe] = t + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + if b.conn, err = b.cfg.newConn(); err != nil { + return err + } + defer b.conn.Close() + + for _, t := range b.tubes { + tt := t + if tt.execPool != nil { + go tt.serve(b.cfg) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, t := range b.tubes { + t.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + t, ok := b.tubes[pipe] + if !ok { + return fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + t.stop() + + t.execPool = execPool + t.errHandler = errHandler + + if b.conn != nil { + tt := t + if tt.execPool != nil { + go tt.serve(connFactory(b.cfg)) + } + } + + return nil +} + +// Push data into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + t := b.tube(pipe) + if t == nil { + return "", fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + data, err := pack(j) + if err != nil { + return "", err + } + + return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration()) +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + t := b.tube(pipe) + if t == nil { + return nil, fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + return t.stat(b.conn) +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) tube(pipe *jobs.Pipeline) *tube { + b.mu.Lock() + defer b.mu.Unlock() + + t, ok := b.tubes[pipe] + if !ok { + return nil + } + + return t +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go new file mode 100644 index 00000000..cd2132af --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/broker_test.go @@ -0,0 +1,276 @@ +package beanstalk + +import ( + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "beanstalk", + "name": "default", + "tube": "test", + } + + cfg = &Config{ + Addr: "tcp://localhost:11300", + } +) + +func init() { + conn, err := beanstalk.Dial("tcp", "localhost:11300") + if err != nil { + panic(err) + } + defer conn.Close() + + t := beanstalk.Tube{Name: "testTube", Conn: conn} + + for { + id, _, err := t.PeekReady() + if id == 0 || err != nil { + break + } + + if err := conn.Delete(id); err != nil { + panic(err) + } + } +} + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Register_Invalid(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "beanstalk", + "name": "default", + })) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Error(t *testing.T) { + b := &Broker{} + _, err := b.Init(&Config{ + Addr: "tcp://localhost:11399", + }) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Serve()) +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go new file mode 100644 index 00000000..3e48a2d7 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/config.go @@ -0,0 +1,50 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/roadrunner/service" + "strings" + "time" +) + +// Config defines beanstalk broker configuration. +type Config struct { + // Addr of beanstalk server. + Addr string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Addr == "" { + return fmt.Errorf("beanstalk address is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to allocate the connection. +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} + +// size creates new rpc socket Listener. +func (c *Config) newConn() (*conn, error) { + dsn := strings.Split(c.Addr, "://") + if len(dsn) != 2 { + return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)") + } + + return newConn(dsn[0], dsn[1], c.TimeoutDuration()) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go new file mode 100644 index 00000000..4ba08a04 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/config_test.go @@ -0,0 +1,47 @@ +package beanstalk + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func TestConfig_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func TestConfig_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"addr":""}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func TestConfig_Hydrate_Error3(t *testing.T) { + cfg := &mockCfg{`{"addr":"tcp"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + _, err := c.newConn() + assert.Error(t, err) +} + +func TestConfig_Hydrate_Error4(t *testing.T) { + cfg := &mockCfg{`{"addr":"unix://sock.bean"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + _, err := c.newConn() + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go new file mode 100644 index 00000000..7aba6bbb --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/conn.go @@ -0,0 +1,180 @@ +package beanstalk + +import ( + "fmt" + "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" + "strings" + "sync" + "time" +) + +var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} + +// creates new connections +type connFactory interface { + newConn() (*conn, error) +} + +// conn protects allocation for one connection between +// threads and provides reconnecting capabilities. +type conn struct { + tout time.Duration + conn *beanstalk.Conn + alive bool + free chan interface{} + dead chan interface{} + stop chan interface{} + lock *sync.Cond +} + +// creates new beanstalk connection and reconnect watcher. +func newConn(network, addr string, tout time.Duration) (cn *conn, err error) { + cn = &conn{ + tout: tout, + alive: true, + free: make(chan interface{}, 1), + dead: make(chan interface{}, 1), + stop: make(chan interface{}), + lock: sync.NewCond(&sync.Mutex{}), + } + + cn.conn, err = beanstalk.Dial(network, addr) + if err != nil { + return nil, err + } + + go cn.watch(network, addr) + + return cn, nil +} + +// reset the connection and reconnect watcher. +func (cn *conn) Close() error { + cn.lock.L.Lock() + defer cn.lock.L.Unlock() + + close(cn.stop) + for cn.alive { + cn.lock.Wait() + } + + return nil +} + +// acquire connection instance or return error in case of timeout. When mandratory set to true +// timeout won't be applied. +func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) { + // do not apply timeout on mandatory connections + if mandatory { + select { + case <-cn.stop: + return nil, fmt.Errorf("connection closed") + case <-cn.free: + return cn.conn, nil + } + } + + select { + case <-cn.stop: + return nil, fmt.Errorf("connection closed") + case <-cn.free: + return cn.conn, nil + default: + // *2 to handle commands called right after the connection reset + tout := time.NewTimer(cn.tout * 2) + select { + case <-cn.stop: + tout.Stop() + return nil, fmt.Errorf("connection closed") + case <-cn.free: + tout.Stop() + return cn.conn, nil + case <-tout.C: + return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout) + } + } +} + +// release acquired connection. +func (cn *conn) release(err error) error { + if isConnError(err) { + // reconnect is required + cn.dead <- err + } else { + cn.free <- nil + } + + return err +} + +// watch and reconnect if dead +func (cn *conn) watch(network, addr string) { + cn.free <- nil + t := time.NewTicker(WatchThrottleLimit) + defer t.Stop() + for { + select { + case <-cn.dead: + // simple throttle limiter + <-t.C + // try to reconnect + // TODO add logging here + expb := backoff.NewExponentialBackOff() + expb.MaxInterval = cn.tout + + reconnect := func() error { + conn, err := beanstalk.Dial(network, addr) + if err != nil { + fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error())) + return err + } + + // TODO ADD LOGGING + fmt.Println("------beanstalk successfully redialed------") + + cn.conn = conn + cn.free <- nil + return nil + } + + err := backoff.Retry(reconnect, expb) + if err != nil { + fmt.Println(fmt.Sprintf("redial failed: %s", err.Error())) + cn.dead <- nil + } + + case <-cn.stop: + cn.lock.L.Lock() + select { + case <-cn.dead: + case <-cn.free: + } + + // stop underlying connection + cn.conn.Close() + cn.alive = false + cn.lock.Signal() + + cn.lock.L.Unlock() + + return + } + } +} + +// isConnError indicates that error is related to dead socket. +func isConnError(err error) bool { + if err == nil { + return false + } + + for _, errStr := range connErrors { + // golang... + if strings.Contains(err.Error(), errStr) { + return true + } + } + + return false +} diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go new file mode 100644 index 00000000..84be305e --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/constants.go @@ -0,0 +1,6 @@ +package beanstalk + +import "time" + +// WatchThrottleLimit is used to limit reconnection occurrence in watch function +const WatchThrottleLimit = time.Second
\ No newline at end of file diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go new file mode 100644 index 00000000..b16866ae --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/consume_test.go @@ -0,0 +1,242 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed >= time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go new file mode 100644 index 00000000..499a5206 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/durability_test.go @@ -0,0 +1,575 @@ +package beanstalk + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Addr: "tcp://localhost:11301", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:11301", + upstream: "localhost:11300", + accept: true, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + // expect 2 connections + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // reoccuring + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NotEqual(t, "0", jid) + + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + proxy.reset(true) + + // auto-reconnect + _, serr = b.Stat(pipe) + assert.NoError(t, serr) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2) + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(3), st.Queue+st.Active) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + if j.Payload == "kill" { + proxy.reset(true) + } + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(2).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go new file mode 100644 index 00000000..fd9c8c3c --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/job.go @@ -0,0 +1,24 @@ +package beanstalk + +import ( + "bytes" + "encoding/gob" + "github.com/spiral/jobs/v2" +) + +func pack(j *jobs.Job) ([]byte, error) { + b := new(bytes.Buffer) + err := gob.NewEncoder(b).Encode(j) + if err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +func unpack(data []byte) (*jobs.Job, error) { + j := &jobs.Job{} + err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j) + + return j, err +} diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/sock.bean diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go new file mode 100644 index 00000000..14a55859 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/stat_test.go @@ -0,0 +1,66 @@ +package beanstalk + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + // beanstalk reserves job right after push + time.Sleep(time.Millisecond * 100) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(1), stat.Active) + + close(waitJob) + return nil + } + + <-waitJob + + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go new file mode 100644 index 00000000..9d7ad117 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/tube.go @@ -0,0 +1,250 @@ +package beanstalk + +import ( + "fmt" + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/jobs/v2" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type tube struct { + active int32 + pipe *jobs.Pipeline + mut sync.Mutex + tube *beanstalk.Tube + tubeSet *beanstalk.TubeSet + reserve time.Duration + + // tube events + lsn func(event int, ctx interface{}) + + // stop channel + wait chan interface{} + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +type entry struct { + id uint64 + data []byte +} + +func (e *entry) String() string { + return fmt.Sprintf("%v", e.id) +} + +// create new tube consumer and producer +func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) { + if pipe.String("tube", "") == "" { + return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline") + } + + return &tube{ + pipe: pipe, + tube: &beanstalk.Tube{Name: pipe.String("tube", "")}, + tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")), + reserve: pipe.Duration("reserve", time.Second), + lsn: lsn, + }, nil +} + +// run consumers +func (t *tube) serve(connector connFactory) { + // tube specific consume connection + cn, err := connector.newConn() + if err != nil { + t.report(err) + return + } + defer cn.Close() + + t.wait = make(chan interface{}) + atomic.StoreInt32(&t.active, 1) + + for { + e, err := t.consume(cn) + if err != nil { + if isConnError(err) { + t.report(err) + } + continue + } + + if e == nil { + return + } + + h := <-t.execPool + go func(h jobs.Handler, e *entry) { + err := t.do(cn, h, e) + t.execPool <- h + t.wg.Done() + t.report(err) + }(h, e) + } +} + +// fetch consume +func (t *tube) consume(cn *conn) (*entry, error) { + t.muw.Lock() + defer t.muw.Unlock() + + select { + case <-t.wait: + return nil, nil + default: + conn, err := cn.acquire(false) + if err != nil { + return nil, err + } + + t.tubeSet.Conn = conn + + id, data, err := t.tubeSet.Reserve(t.reserve) + cn.release(err) + + if err != nil { + return nil, err + } + + t.wg.Add(1) + return &entry{id: id, data: data}, nil + } +} + +// do data +func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error { + j, err := unpack(e.data) + if err != nil { + return err + } + + err = h(e.String(), j) + + // mandatory acquisition + conn, connErr := cn.acquire(true) + if connErr != nil { + // possible if server is dead + return connErr + } + + if err == nil { + return cn.release(conn.Delete(e.id)) + } + + stat, statErr := conn.StatsJob(e.id) + if statErr != nil { + return cn.release(statErr) + } + + t.errHandler(e.String(), j, err) + + reserves, ok := strconv.Atoi(stat["reserves"]) + if ok != nil || !j.Options.CanRetry(reserves-1) { + return cn.release(conn.Bury(e.id, 0)) + } + + return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration())) +} + +// stop tube consuming +func (t *tube) stop() { + if atomic.LoadInt32(&t.active) == 0 { + return + } + + atomic.StoreInt32(&t.active, 0) + + close(t.wait) + + t.muw.Lock() + t.wg.Wait() + t.muw.Unlock() +} + +// put data into pool or return error (no wait), this method will try to reattempt operation if +// dead conn found. +func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { + id, err = t.doPut(cn, attempt, data, delay, rrt) + if err != nil && isConnError(err) { + return t.doPut(cn, attempt, data, delay, rrt) + } + + return id, err +} + +// perform put operation +func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { + conn, err := cn.acquire(false) + if err != nil { + return "", err + } + + var bid uint64 + + t.mut.Lock() + t.tube.Conn = conn + bid, err = t.tube.Put(data, 0, delay, rrt) + t.mut.Unlock() + + return strconv.FormatUint(bid, 10), cn.release(err) +} + +// return tube stats (retries) +func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) { + stat, err = t.doStat(cn) + if err != nil && isConnError(err) { + return t.doStat(cn) + } + + return stat, err +} + +// return tube stats +func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) { + conn, err := cn.acquire(false) + if err != nil { + return nil, err + } + + t.mut.Lock() + t.tube.Conn = conn + values, err := t.tube.Stats() + t.mut.Unlock() + + if err != nil { + return nil, cn.release(err) + } + + stat = &jobs.Stat{InternalName: t.tube.Name} + + if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil { + stat.Queue = int64(v) + } + + if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil { + stat.Active = int64(v) + } + + if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil { + stat.Delayed = int64(v) + } + + return stat, cn.release(nil) +} + +// report tube specific error +func (t *tube) report(err error) { + if err != nil { + t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go new file mode 100644 index 00000000..b6a646f4 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/tube_test.go @@ -0,0 +1,18 @@ +package beanstalk + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestTube_CantServe(t *testing.T) { + var gctx interface{} + tube := &tube{ + lsn: func(event int, ctx interface{}) { + gctx = ctx + }, + } + + tube.serve(&Config{Addr: "broken"}) + assert.Error(t, gctx.(error)) +} |