diff options
author | Valery Piashchynski <[email protected]> | 2021-06-22 11:44:22 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-22 11:44:22 +0300 |
commit | 1a2a1f4735e40675abf6cd9767c99374359ec2bb (patch) | |
tree | 5abedf7306b50b02ba3892c0bc562307a62eb332 /plugins/jobs/oooold/broker | |
parent | 260d69c21fba6d763d05dc5693689ddf7ce7bfe2 (diff) |
- Remove all old code, reformat, fix linters, return GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker')
37 files changed, 0 insertions, 7072 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go deleted file mode 100644 index b47d83ee..00000000 --- a/plugins/jobs/oooold/broker/amqp/broker.go +++ /dev/null @@ -1,216 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/gofrs/uuid" - "github.com/spiral/jobs/v2" - "sync" - "sync/atomic" -) - -// Broker represents AMQP broker. -type Broker struct { - cfg *Config - lsn func(event int, ctx interface{}) - publish *chanPool - consume *chanPool - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures AMQP job broker (always 2 connections). -func (b *Broker) Init(cfg *Config) (ok bool, err error) { - b.cfg = cfg - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - q, err := newQueue(pipe, b.throw) - if err != nil { - return err - } - - b.queues[pipe] = q - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { - b.mu.Unlock() - return err - } - defer b.publish.Close() - - if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { - b.mu.Unlock() - return err - } - defer b.consume.Close() - - for _, q := range b.queues { - err := q.declare(b.publish, q.name, q.key, nil) - if err != nil { - b.mu.Unlock() - return err - } - } - - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve(b.publish, b.consume) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, q := range b.queues { - q.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.publish != nil && q.execPool != nil { - if q.execPool != nil { - go q.serve(b.publish, b.consume) - } - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - id, err := uuid.NewV4() - if err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil { - return "", err - } - - return id.String(), nil -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - queue, err := q.inspect(b.publish) - if err != nil { - return nil, err - } - - // this the closest approximation we can get for now - return &jobs.Stat{ - InternalName: queue.Name, - Queue: int64(queue.Messages), - Active: int64(atomic.LoadInt32(&q.running)), - }, nil -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go deleted file mode 100644 index 66078099..00000000 --- a/plugins/jobs/oooold/broker/amqp/broker_test.go +++ /dev/null @@ -1,419 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "amqp", - "name": "default", - "queue": "rr-queue", - "exchange": "rr-exchange", - "prefetch": 1, - } - - cfg = &Config{ - Addr: "amqp://guest:guest@localhost:5672/", - } -) - -var ( - fanoutPipe = &jobs.Pipeline{ - "broker": "amqp", - "name": "fanout", - "queue": "fanout-queue", - "exchange": "fanout-exchange", - "exchange-type": "fanout", - "prefetch": 1, - } - - fanoutCfg = &Config{ - Addr: "amqp://guest:guest@localhost:5672/", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_BadPipeline(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "amqp", - "name": "default", - "exchange": "rr-exchange", - "prefetch": 1, - })) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_CantStart(t *testing.T) { - b := &Broker{} - _, err := b.Init(&Config{ - Addr: "amqp://guest:guest@localhost:15672/", - }) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Serve()) -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal() - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_Queue_RoutingKey(t *testing.T) { - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key") -} - -func TestBroker_Register_With_RoutingKey(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - assert.NoError(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Consume_With_RoutingKey(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - err = b.Register(&pipeWithKey) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(&pipeWithKey, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Queue_ExchangeType(t *testing.T) { - pipeWithKey := pipe.With("exchange-type", "direct") - - assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct") -} - -func TestBroker_Register_With_ExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("exchange-type", "fanout") - - assert.NoError(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Register_With_WrongExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("exchange-type", "xxx") - - assert.Error(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Consume_With_ExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(fanoutCfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := fanoutPipe.With("exchange-type", "fanout") - - err = b.Register(&pipeWithKey) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(&pipeWithKey, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go deleted file mode 100644 index be747776..00000000 --- a/plugins/jobs/oooold/broker/amqp/conn.go +++ /dev/null @@ -1,232 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/cenkalti/backoff/v4" - "github.com/streadway/amqp" - "sync" - "time" -) - -// manages set of AMQP channels -type chanPool struct { - // timeout to backoff redial - tout time.Duration - url string - - mu *sync.Mutex - - conn *amqp.Connection - channels map[string]*channel - wait chan interface{} - connected chan interface{} -} - -// manages single channel -type channel struct { - ch *amqp.Channel - // todo unused - //consumer string - confirm chan amqp.Confirmation - signal chan error -} - -// newConn creates new watched AMQP connection -func newConn(url string, tout time.Duration) (*chanPool, error) { - conn, err := dial(url) - if err != nil { - return nil, err - } - - cp := &chanPool{ - url: url, - tout: tout, - conn: conn, - mu: &sync.Mutex{}, - channels: make(map[string]*channel), - wait: make(chan interface{}), - connected: make(chan interface{}), - } - - close(cp.connected) - go cp.watch() - return cp, nil -} - -// dial dials to AMQP. -func dial(url string) (*amqp.Connection, error) { - return amqp.Dial(url) -} - -// Close gracefully closes all underlying channels and connection. -func (cp *chanPool) Close() error { - cp.mu.Lock() - - close(cp.wait) - if cp.channels == nil { - return fmt.Errorf("connection is dead") - } - - // close all channels and consume - var wg sync.WaitGroup - for _, ch := range cp.channels { - wg.Add(1) - - go func(ch *channel) { - defer wg.Done() - cp.closeChan(ch, nil) - }(ch) - } - cp.mu.Unlock() - - wg.Wait() - - cp.mu.Lock() - defer cp.mu.Unlock() - - if cp.conn != nil { - return cp.conn.Close() - } - - return nil -} - -// waitConnected waits till connection is connected again or eventually closed. -// must only be invoked after connection error has been delivered to channel.signal. -func (cp *chanPool) waitConnected() chan interface{} { - cp.mu.Lock() - defer cp.mu.Unlock() - - return cp.connected -} - -// watch manages connection state and reconnects if needed -func (cp *chanPool) watch() { - for { - select { - case <-cp.wait: - // connection has been closed - return - // here we are waiting for the errors from amqp connection - case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)): - cp.mu.Lock() - // clear connected, since connections are dead - cp.connected = make(chan interface{}) - - // broadcast error to all consume to let them for the tryReconnect - for _, ch := range cp.channels { - ch.signal <- err - } - - // disable channel allocation while server is dead - cp.conn = nil - cp.channels = nil - - // initialize the backoff - expb := backoff.NewExponentialBackOff() - expb.MaxInterval = cp.tout - cp.mu.Unlock() - - // reconnect function - reconnect := func() error { - cp.mu.Lock() - conn, err := dial(cp.url) - if err != nil { - // still failing - fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error())) - cp.mu.Unlock() - return err - } - - // TODO ADD LOGGING - fmt.Println("------amqp successfully redialed------") - - // here we are reconnected - // replace the connection - cp.conn = conn - // re-init the channels - cp.channels = make(map[string]*channel) - cp.mu.Unlock() - return nil - } - - // start backoff retry - errb := backoff.Retry(reconnect, expb) - if errb != nil { - fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error())) - // reconnection failed - close(cp.connected) - return - } - close(cp.connected) - } - } -} - -// channel allocates new channel on amqp connection -func (cp *chanPool) channel(name string) (*channel, error) { - cp.mu.Lock() - dead := cp.conn == nil - cp.mu.Unlock() - - if dead { - // wait for connection restoration (doubled the timeout duration) - select { - case <-time.NewTimer(cp.tout * 2).C: - return nil, fmt.Errorf("connection is dead") - case <-cp.connected: - // connected - } - } - - cp.mu.Lock() - defer cp.mu.Unlock() - - if cp.conn == nil { - return nil, fmt.Errorf("connection has been closed") - } - - if ch, ok := cp.channels[name]; ok { - return ch, nil - } - - // we must create new channel - ch, err := cp.conn.Channel() - if err != nil { - return nil, err - } - - // Enable publish confirmations - if err = ch.Confirm(false); err != nil { - return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err) - } - - // we expect that every allocated channel would have listener on signal - // this is not true only in case of pure producing channels - cp.channels[name] = &channel{ - ch: ch, - confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)), - signal: make(chan error, 1), - } - - return cp.channels[name], nil -} - -// closeChan gracefully closes and removes channel allocation. -func (cp *chanPool) closeChan(c *channel, err error) error { - cp.mu.Lock() - defer cp.mu.Unlock() - - go func() { - c.signal <- nil - c.ch.Close() - }() - - for name, ch := range cp.channels { - if ch == c { - delete(cp.channels, name) - } - } - - return err -} diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go deleted file mode 100644 index 28999c36..00000000 --- a/plugins/jobs/oooold/broker/amqp/consume_test.go +++ /dev/null @@ -1,258 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - start := time.Now() - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed >= time.Second) - assert.True(t, elapsed < 3*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go deleted file mode 100644 index 00d62c51..00000000 --- a/plugins/jobs/oooold/broker/amqp/durability_test.go +++ /dev/null @@ -1,728 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Addr: "amqp://guest:guest@localhost:5673/", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:5673", - upstream: "localhost:5672", - accept: true, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - // expect 2 connections - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NotEqual(t, "0", jid) - - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - proxy.reset(true) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2_2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // start when connection is dead - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - proxy.reset(false) - - _, serr := b.Stat(pipe) - assert.Error(t, serr) - - proxy.reset(true) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2) - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - - if j.Payload == "kill" && len(done) == 0 { - proxy.reset(true) - } - - mu.Lock() - defer mu.Unlock() - done[id] = true - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 3 { - break - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(2).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go deleted file mode 100644 index bd559715..00000000 --- a/plugins/jobs/oooold/broker/amqp/job.go +++ /dev/null @@ -1,56 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/streadway/amqp" -) - -// pack job metadata into headers -func pack(id string, attempt int, j *jobs.Job) amqp.Table { - return amqp.Table{ - "rr-id": id, - "rr-job": j.Job, - "rr-attempt": int64(attempt), - "rr-maxAttempts": int64(j.Options.Attempts), - "rr-timeout": int64(j.Options.Timeout), - "rr-delay": int64(j.Options.Delay), - "rr-retryDelay": int64(j.Options.RetryDelay), - } -} - -// unpack restores jobs.Options -func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) { - j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}} - - if _, ok := d.Headers["rr-id"].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id") - } - - if _, ok := d.Headers["rr-attempt"].(int64); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt") - } - - if _, ok := d.Headers["rr-job"].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job") - } - j.Job = d.Headers["rr-job"].(string) - - if _, ok := d.Headers["rr-maxAttempts"].(int64); ok { - j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64)) - } - - if _, ok := d.Headers["rr-timeout"].(int64); ok { - j.Options.Timeout = int(d.Headers["rr-timeout"].(int64)) - } - - if _, ok := d.Headers["rr-delay"].(int64); ok { - j.Options.Delay = int(d.Headers["rr-delay"].(int64)) - } - - if _, ok := d.Headers["rr-retryDelay"].(int64); ok { - j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64)) - } - - return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil -} diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go deleted file mode 100644 index 24ca453b..00000000 --- a/plugins/jobs/oooold/broker/amqp/job_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package amqp - -import ( - "github.com/streadway/amqp" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_Unpack_Errors(t *testing.T) { - _, _, _, err := unpack(amqp.Delivery{ - Headers: map[string]interface{}{}, - }) - assert.Error(t, err) - - _, _, _, err = unpack(amqp.Delivery{ - Headers: map[string]interface{}{ - "rr-id": "id", - }, - }) - assert.Error(t, err) - - _, _, _, err = unpack(amqp.Delivery{ - Headers: map[string]interface{}{ - "rr-id": "id", - "rr-attempt": int64(0), - }, - }) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go deleted file mode 100644 index 6ef5f20f..00000000 --- a/plugins/jobs/oooold/broker/amqp/queue.go +++ /dev/null @@ -1,302 +0,0 @@ -package amqp - -import ( - "errors" - "fmt" - "github.com/spiral/jobs/v2" - "github.com/streadway/amqp" - "os" - "sync" - "sync/atomic" - "time" -) - -type ExchangeType string - -const ( - Direct ExchangeType = "direct" - Fanout ExchangeType = "fanout" - Topic ExchangeType = "topic" - Headers ExchangeType = "headers" -) - -func (et ExchangeType) IsValid() error { - switch et { - case Direct, Fanout, Topic, Headers: - return nil - } - return errors.New("unknown exchange-type") -} - -func (et ExchangeType) String() string { - switch et { - case Direct, Fanout, Topic, Headers: - return string(et) - default: - return "direct" - } -} - - -type queue struct { - active int32 - pipe *jobs.Pipeline - exchange string - exchangeType ExchangeType - name, key string - consumer string - - // active consuming channel - muc sync.Mutex - cc *channel - - // queue events - lsn func(event int, ctx interface{}) - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - running int32 - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -// newQueue creates new queue wrapper for AMQP. -func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { - if pipe.String("queue", "") == "" { - return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline") - } - - exchangeType := ExchangeType(pipe.String("exchange-type", "direct")) - - err := exchangeType.IsValid() - if err != nil { - return nil, fmt.Errorf(err.Error()) - } - - return &queue{ - exchange: pipe.String("exchange", "amqp.direct"), - exchangeType: exchangeType, - name: pipe.String("queue", ""), - key: pipe.String("routing-key", pipe.String("queue", "")), - consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())), - pipe: pipe, - lsn: lsn, - }, nil -} - -// serve consumes queue -func (q *queue) serve(publish, consume *chanPool) { - atomic.StoreInt32(&q.active, 1) - - for { - <-consume.waitConnected() - if atomic.LoadInt32(&q.active) == 0 { - // stopped - return - } - - delivery, cc, err := q.consume(consume) - if err != nil { - q.report(err) - continue - } - - q.muc.Lock() - q.cc = cc - q.muc.Unlock() - - for d := range delivery { - q.muw.Lock() - q.wg.Add(1) - q.muw.Unlock() - - atomic.AddInt32(&q.running, 1) - h := <-q.execPool - - go func(h jobs.Handler, d amqp.Delivery) { - err := q.do(publish, h, d) - - atomic.AddInt32(&q.running, ^int32(0)) - q.execPool <- h - q.wg.Done() - q.report(err) - }(h, d) - } - } -} - -func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) { - // allocate channel for the consuming - if cc, err = consume.channel(q.name); err != nil { - return nil, nil, err - } - - if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil { - return nil, nil, consume.closeChan(cc, err) - } - - delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil) - if err != nil { - return nil, nil, consume.closeChan(cc, err) - } - - // do i like it? - go func(consume *chanPool) { - for err := range cc.signal { - consume.closeChan(cc, err) - return - } - }(consume) - - return delivery, cc, err -} - -func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error { - id, attempt, j, err := unpack(d) - if err != nil { - q.report(err) - return d.Nack(false, false) - } - err = h(id, j) - - if err == nil { - return d.Ack(false) - } - - // failed - q.errHandler(id, j, err) - - if !j.Options.CanRetry(attempt) { - return d.Nack(false, false) - } - - // retry as new j (to accommodate attempt number and new delay) - if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil { - q.report(err) - return d.Nack(false, true) - } - - return d.Ack(false) -} - -func (q *queue) stop() { - if atomic.LoadInt32(&q.active) == 0 { - return - } - - atomic.StoreInt32(&q.active, 0) - - q.muc.Lock() - if q.cc != nil { - // gracefully stopped consuming - q.report(q.cc.ch.Cancel(q.consumer, true)) - } - q.muc.Unlock() - - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() -} - -// publish message to queue or to delayed queue. -func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error { - c, err := cp.channel(q.name) - if err != nil { - return err - } - - qKey := q.key - - if delay != 0 { - delayMs := int64(delay.Seconds() * 1000) - qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name) - qKey = qName - - err := q.declare(cp, qName, qName, amqp.Table{ - "x-dead-letter-exchange": q.exchange, - "x-dead-letter-routing-key": q.name, - "x-message-ttl": delayMs, - "x-expires": delayMs * 2, - }) - - if err != nil { - return err - } - } - - err = c.ch.Publish( - q.exchange, // exchange - qKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/octet-stream", - Body: j.Body(), - DeliveryMode: amqp.Persistent, - Headers: pack(id, attempt, j), - }, - ) - - if err != nil { - return cp.closeChan(c, err) - } - - confirmed, ok := <-c.confirm - if ok && confirmed.Ack { - return nil - } - - return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag) -} - -// declare queue and binding to it -func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error { - c, err := cp.channel(q.name) - if err != nil { - return err - } - - err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil) - if err != nil { - return cp.closeChan(c, err) - } - - _, err = c.ch.QueueDeclare(queue, true, false, false, false, args) - if err != nil { - return cp.closeChan(c, err) - } - - err = c.ch.QueueBind(queue, key, q.exchange, false, nil) - if err != nil { - return cp.closeChan(c, err) - } - - // keep channel open - return err -} - -// inspect the queue -func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) { - c, err := cp.channel("stat") - if err != nil { - return nil, err - } - - queue, err := c.ch.QueueInspect(q.name) - if err != nil { - return nil, cp.closeChan(c, err) - } - - // keep channel open - return &queue, err -} - -// throw handles service, server and pool events. -func (q *queue) report(err error) { - if err != nil { - q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go deleted file mode 100644 index ef19746c..00000000 --- a/plugins/jobs/oooold/broker/amqp/stat_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "sync" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - wg := &sync.WaitGroup{} - wg.Add(1) - exec <- func(id string, j *jobs.Job) error { - defer wg.Done() - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Active) - - return nil - } - - wg.Wait() - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(0), stat.Active) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go deleted file mode 100644 index dc3ea518..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/broker.go +++ /dev/null @@ -1,185 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker run consume using Broker service. -type Broker struct { - cfg *Config - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - conn *conn - tubes map[*jobs.Pipeline]*tube -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures broker. -func (b *Broker) Init(cfg *Config) (bool, error) { - b.cfg = cfg - b.tubes = make(map[*jobs.Pipeline]*tube) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.tubes[pipe]; ok { - return fmt.Errorf("tube `%s` has already been registered", pipe.Name()) - } - - t, err := newTube(pipe, b.throw) - if err != nil { - return err - } - - b.tubes[pipe] = t - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - if b.conn, err = b.cfg.newConn(); err != nil { - return err - } - defer b.conn.Close() - - for _, t := range b.tubes { - tt := t - if tt.execPool != nil { - go tt.serve(b.cfg) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, t := range b.tubes { - t.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - t, ok := b.tubes[pipe] - if !ok { - return fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - t.stop() - - t.execPool = execPool - t.errHandler = errHandler - - if b.conn != nil { - tt := t - if tt.execPool != nil { - go tt.serve(connFactory(b.cfg)) - } - } - - return nil -} - -// Push data into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - t := b.tube(pipe) - if t == nil { - return "", fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - data, err := pack(j) - if err != nil { - return "", err - } - - return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration()) -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - t := b.tube(pipe) - if t == nil { - return nil, fmt.Errorf("undefined tube `%s`", pipe.Name()) - } - - return t.stat(b.conn) -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) tube(pipe *jobs.Pipeline) *tube { - b.mu.Lock() - defer b.mu.Unlock() - - t, ok := b.tubes[pipe] - if !ok { - return nil - } - - return t -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go deleted file mode 100644 index cd2132af..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/broker_test.go +++ /dev/null @@ -1,276 +0,0 @@ -package beanstalk - -import ( - "github.com/beanstalkd/go-beanstalk" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "beanstalk", - "name": "default", - "tube": "test", - } - - cfg = &Config{ - Addr: "tcp://localhost:11300", - } -) - -func init() { - conn, err := beanstalk.Dial("tcp", "localhost:11300") - if err != nil { - panic(err) - } - defer conn.Close() - - t := beanstalk.Tube{Name: "testTube", Conn: conn} - - for { - id, _, err := t.PeekReady() - if id == 0 || err != nil { - break - } - - if err := conn.Delete(id); err != nil { - panic(err) - } - } -} - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Register_Invalid(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "beanstalk", - "name": "default", - })) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Error(t *testing.T) { - b := &Broker{} - _, err := b.Init(&Config{ - Addr: "tcp://localhost:11399", - }) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Serve()) -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go deleted file mode 100644 index 3e48a2d7..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/config.go +++ /dev/null @@ -1,50 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/roadrunner/service" - "strings" - "time" -) - -// Config defines beanstalk broker configuration. -type Config struct { - // Addr of beanstalk server. - Addr string - - // Timeout to allocate the connection. Default 10 seconds. - Timeout int -} - -// Hydrate config values. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - if c.Addr == "" { - return fmt.Errorf("beanstalk address is missing") - } - - return nil -} - -// TimeoutDuration returns number of seconds allowed to allocate the connection. -func (c *Config) TimeoutDuration() time.Duration { - timeout := c.Timeout - if timeout == 0 { - timeout = 10 - } - - return time.Duration(timeout) * time.Second -} - -// size creates new rpc socket Listener. -func (c *Config) newConn() (*conn, error) { - dsn := strings.Split(c.Addr, "://") - if len(dsn) != 2 { - return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)") - } - - return newConn(dsn[0], dsn[1], c.TimeoutDuration()) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go deleted file mode 100644 index 4ba08a04..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/config_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package beanstalk - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func TestConfig_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{`{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func TestConfig_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{"addr":""}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func TestConfig_Hydrate_Error3(t *testing.T) { - cfg := &mockCfg{`{"addr":"tcp"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - _, err := c.newConn() - assert.Error(t, err) -} - -func TestConfig_Hydrate_Error4(t *testing.T) { - cfg := &mockCfg{`{"addr":"unix://sock.bean"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - _, err := c.newConn() - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go deleted file mode 100644 index 7aba6bbb..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/conn.go +++ /dev/null @@ -1,180 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/beanstalkd/go-beanstalk" - "github.com/cenkalti/backoff/v4" - "strings" - "sync" - "time" -) - -var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} - -// creates new connections -type connFactory interface { - newConn() (*conn, error) -} - -// conn protects allocation for one connection between -// threads and provides reconnecting capabilities. -type conn struct { - tout time.Duration - conn *beanstalk.Conn - alive bool - free chan interface{} - dead chan interface{} - stop chan interface{} - lock *sync.Cond -} - -// creates new beanstalk connection and reconnect watcher. -func newConn(network, addr string, tout time.Duration) (cn *conn, err error) { - cn = &conn{ - tout: tout, - alive: true, - free: make(chan interface{}, 1), - dead: make(chan interface{}, 1), - stop: make(chan interface{}), - lock: sync.NewCond(&sync.Mutex{}), - } - - cn.conn, err = beanstalk.Dial(network, addr) - if err != nil { - return nil, err - } - - go cn.watch(network, addr) - - return cn, nil -} - -// reset the connection and reconnect watcher. -func (cn *conn) Close() error { - cn.lock.L.Lock() - defer cn.lock.L.Unlock() - - close(cn.stop) - for cn.alive { - cn.lock.Wait() - } - - return nil -} - -// acquire connection instance or return error in case of timeout. When mandratory set to true -// timeout won't be applied. -func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) { - // do not apply timeout on mandatory connections - if mandatory { - select { - case <-cn.stop: - return nil, fmt.Errorf("connection closed") - case <-cn.free: - return cn.conn, nil - } - } - - select { - case <-cn.stop: - return nil, fmt.Errorf("connection closed") - case <-cn.free: - return cn.conn, nil - default: - // *2 to handle commands called right after the connection reset - tout := time.NewTimer(cn.tout * 2) - select { - case <-cn.stop: - tout.Stop() - return nil, fmt.Errorf("connection closed") - case <-cn.free: - tout.Stop() - return cn.conn, nil - case <-tout.C: - return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout) - } - } -} - -// release acquired connection. -func (cn *conn) release(err error) error { - if isConnError(err) { - // reconnect is required - cn.dead <- err - } else { - cn.free <- nil - } - - return err -} - -// watch and reconnect if dead -func (cn *conn) watch(network, addr string) { - cn.free <- nil - t := time.NewTicker(WatchThrottleLimit) - defer t.Stop() - for { - select { - case <-cn.dead: - // simple throttle limiter - <-t.C - // try to reconnect - // TODO add logging here - expb := backoff.NewExponentialBackOff() - expb.MaxInterval = cn.tout - - reconnect := func() error { - conn, err := beanstalk.Dial(network, addr) - if err != nil { - fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error())) - return err - } - - // TODO ADD LOGGING - fmt.Println("------beanstalk successfully redialed------") - - cn.conn = conn - cn.free <- nil - return nil - } - - err := backoff.Retry(reconnect, expb) - if err != nil { - fmt.Println(fmt.Sprintf("redial failed: %s", err.Error())) - cn.dead <- nil - } - - case <-cn.stop: - cn.lock.L.Lock() - select { - case <-cn.dead: - case <-cn.free: - } - - // stop underlying connection - cn.conn.Close() - cn.alive = false - cn.lock.Signal() - - cn.lock.L.Unlock() - - return - } - } -} - -// isConnError indicates that error is related to dead socket. -func isConnError(err error) bool { - if err == nil { - return false - } - - for _, errStr := range connErrors { - // golang... - if strings.Contains(err.Error(), errStr) { - return true - } - } - - return false -} diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go deleted file mode 100644 index 84be305e..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/constants.go +++ /dev/null @@ -1,6 +0,0 @@ -package beanstalk - -import "time" - -// WatchThrottleLimit is used to limit reconnection occurrence in watch function -const WatchThrottleLimit = time.Second
\ No newline at end of file diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go deleted file mode 100644 index b16866ae..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/consume_test.go +++ /dev/null @@ -1,242 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed >= time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go deleted file mode 100644 index 499a5206..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/durability_test.go +++ /dev/null @@ -1,575 +0,0 @@ -package beanstalk - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Addr: "tcp://localhost:11301", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:11301", - upstream: "localhost:11300", - accept: true, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - // expect 2 connections - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // reoccuring - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NotEqual(t, "0", jid) - - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - proxy.reset(true) - - // auto-reconnect - _, serr = b.Stat(pipe) - assert.NoError(t, serr) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(2) - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - st, serr := b.Stat(pipe) - assert.NoError(t, serr) - assert.Equal(t, int64(3), st.Queue+st.Active) - - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - done[id] = true - if j.Payload == "kill" { - proxy.reset(true) - } - - return nil - } - - for { - st, err := b.Stat(pipe) - if err != nil { - continue - } - - // wait till pipeline is empty - if st.Queue+st.Active == 0 { - return - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(2).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go deleted file mode 100644 index fd9c8c3c..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/job.go +++ /dev/null @@ -1,24 +0,0 @@ -package beanstalk - -import ( - "bytes" - "encoding/gob" - "github.com/spiral/jobs/v2" -) - -func pack(j *jobs.Job) ([]byte, error) { - b := new(bytes.Buffer) - err := gob.NewEncoder(b).Encode(j) - if err != nil { - return nil, err - } - - return b.Bytes(), nil -} - -func unpack(data []byte) (*jobs.Job, error) { - j := &jobs.Job{} - err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j) - - return j, err -} diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean deleted file mode 100644 index e69de29b..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/sock.bean +++ /dev/null diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go deleted file mode 100644 index 14a55859..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/stat_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package beanstalk - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - // beanstalk reserves job right after push - time.Sleep(time.Millisecond * 100) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(1), stat.Active) - - close(waitJob) - return nil - } - - <-waitJob - - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) -} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go deleted file mode 100644 index 9d7ad117..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/tube.go +++ /dev/null @@ -1,250 +0,0 @@ -package beanstalk - -import ( - "fmt" - "github.com/beanstalkd/go-beanstalk" - "github.com/spiral/jobs/v2" - "strconv" - "sync" - "sync/atomic" - "time" -) - -type tube struct { - active int32 - pipe *jobs.Pipeline - mut sync.Mutex - tube *beanstalk.Tube - tubeSet *beanstalk.TubeSet - reserve time.Duration - - // tube events - lsn func(event int, ctx interface{}) - - // stop channel - wait chan interface{} - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -type entry struct { - id uint64 - data []byte -} - -func (e *entry) String() string { - return fmt.Sprintf("%v", e.id) -} - -// create new tube consumer and producer -func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) { - if pipe.String("tube", "") == "" { - return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline") - } - - return &tube{ - pipe: pipe, - tube: &beanstalk.Tube{Name: pipe.String("tube", "")}, - tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")), - reserve: pipe.Duration("reserve", time.Second), - lsn: lsn, - }, nil -} - -// run consumers -func (t *tube) serve(connector connFactory) { - // tube specific consume connection - cn, err := connector.newConn() - if err != nil { - t.report(err) - return - } - defer cn.Close() - - t.wait = make(chan interface{}) - atomic.StoreInt32(&t.active, 1) - - for { - e, err := t.consume(cn) - if err != nil { - if isConnError(err) { - t.report(err) - } - continue - } - - if e == nil { - return - } - - h := <-t.execPool - go func(h jobs.Handler, e *entry) { - err := t.do(cn, h, e) - t.execPool <- h - t.wg.Done() - t.report(err) - }(h, e) - } -} - -// fetch consume -func (t *tube) consume(cn *conn) (*entry, error) { - t.muw.Lock() - defer t.muw.Unlock() - - select { - case <-t.wait: - return nil, nil - default: - conn, err := cn.acquire(false) - if err != nil { - return nil, err - } - - t.tubeSet.Conn = conn - - id, data, err := t.tubeSet.Reserve(t.reserve) - cn.release(err) - - if err != nil { - return nil, err - } - - t.wg.Add(1) - return &entry{id: id, data: data}, nil - } -} - -// do data -func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error { - j, err := unpack(e.data) - if err != nil { - return err - } - - err = h(e.String(), j) - - // mandatory acquisition - conn, connErr := cn.acquire(true) - if connErr != nil { - // possible if server is dead - return connErr - } - - if err == nil { - return cn.release(conn.Delete(e.id)) - } - - stat, statErr := conn.StatsJob(e.id) - if statErr != nil { - return cn.release(statErr) - } - - t.errHandler(e.String(), j, err) - - reserves, ok := strconv.Atoi(stat["reserves"]) - if ok != nil || !j.Options.CanRetry(reserves-1) { - return cn.release(conn.Bury(e.id, 0)) - } - - return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration())) -} - -// stop tube consuming -func (t *tube) stop() { - if atomic.LoadInt32(&t.active) == 0 { - return - } - - atomic.StoreInt32(&t.active, 0) - - close(t.wait) - - t.muw.Lock() - t.wg.Wait() - t.muw.Unlock() -} - -// put data into pool or return error (no wait), this method will try to reattempt operation if -// dead conn found. -func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { - id, err = t.doPut(cn, attempt, data, delay, rrt) - if err != nil && isConnError(err) { - return t.doPut(cn, attempt, data, delay, rrt) - } - - return id, err -} - -// perform put operation -func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { - conn, err := cn.acquire(false) - if err != nil { - return "", err - } - - var bid uint64 - - t.mut.Lock() - t.tube.Conn = conn - bid, err = t.tube.Put(data, 0, delay, rrt) - t.mut.Unlock() - - return strconv.FormatUint(bid, 10), cn.release(err) -} - -// return tube stats (retries) -func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) { - stat, err = t.doStat(cn) - if err != nil && isConnError(err) { - return t.doStat(cn) - } - - return stat, err -} - -// return tube stats -func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) { - conn, err := cn.acquire(false) - if err != nil { - return nil, err - } - - t.mut.Lock() - t.tube.Conn = conn - values, err := t.tube.Stats() - t.mut.Unlock() - - if err != nil { - return nil, cn.release(err) - } - - stat = &jobs.Stat{InternalName: t.tube.Name} - - if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil { - stat.Queue = int64(v) - } - - if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil { - stat.Active = int64(v) - } - - if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil { - stat.Delayed = int64(v) - } - - return stat, cn.release(nil) -} - -// report tube specific error -func (t *tube) report(err error) { - if err != nil { - t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go deleted file mode 100644 index b6a646f4..00000000 --- a/plugins/jobs/oooold/broker/beanstalk/tube_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package beanstalk - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func TestTube_CantServe(t *testing.T) { - var gctx interface{} - tube := &tube{ - lsn: func(event int, ctx interface{}) { - gctx = ctx - }, - } - - tube.serve(&Config{Addr: "broken"}) - assert.Error(t, gctx.(error)) -} diff --git a/plugins/jobs/oooold/broker/ephemeral/broker.go b/plugins/jobs/oooold/broker/ephemeral/broker.go deleted file mode 100644 index 385bb175..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/broker.go +++ /dev/null @@ -1,174 +0,0 @@ -package ephemeral - -import ( - "fmt" - "github.com/gofrs/uuid" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker run queue using local goroutines. -type Broker struct { - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures broker. -func (b *Broker) Init() (bool, error) { - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0)) - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() error { - // start consuming - b.mu.Lock() - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve() - } - } - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - // stop all consuming - for _, q := range b.queues { - q.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.wait != nil { - if q.execPool != nil { - go q.serve() - } - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - id, err := uuid.NewV4() - if err != nil { - return "", err - } - - q.push(id.String(), j, 0, j.Options.DelayDuration()) - - return id.String(), nil -} - -// Stat must consume statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - return q.stat(), nil -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/ephemeral/broker_test.go b/plugins/jobs/oooold/broker/ephemeral/broker_test.go deleted file mode 100644 index c1b40276..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/broker_test.go +++ /dev/null @@ -1,221 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "local", - "name": "default", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init() - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - b.Init() - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/ephemeral/consume_test.go b/plugins/jobs/oooold/broker/ephemeral/consume_test.go deleted file mode 100644 index d764a984..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/consume_test.go +++ /dev/null @@ -1,253 +0,0 @@ -package ephemeral - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed > time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go deleted file mode 100644 index a24bc216..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/queue.go +++ /dev/null @@ -1,161 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "sync" - "sync/atomic" - "time" -) - -type queue struct { - on int32 - state *jobs.Stat - - // job pipeline - concurPool chan interface{} - jobs chan *entry - - // on operations - muw sync.Mutex - wg sync.WaitGroup - - // stop channel - wait chan interface{} - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -type entry struct { - id string - job *jobs.Job - attempt int -} - -// create new queue -func newQueue(maxConcur int) *queue { - q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)} - - if maxConcur != 0 { - q.concurPool = make(chan interface{}, maxConcur) - for i := 0; i < maxConcur; i++ { - q.concurPool <- nil - } - } - - return q -} - -// serve consumers -func (q *queue) serve() { - q.wait = make(chan interface{}) - atomic.StoreInt32(&q.on, 1) - - for { - e := q.consume() - if e == nil { - q.wg.Wait() - return - } - - if q.concurPool != nil { - <-q.concurPool - } - - atomic.AddInt64(&q.state.Active, 1) - h := <-q.execPool - - go func(h jobs.Handler, e *entry) { - defer q.wg.Done() - - q.do(h, e) - atomic.AddInt64(&q.state.Active, ^int64(0)) - - q.execPool <- h - - if q.concurPool != nil { - q.concurPool <- nil - } - }(h, e) - } -} - -// allocate one job entry -func (q *queue) consume() *entry { - q.muw.Lock() - defer q.muw.Unlock() - - select { - case <-q.wait: - return nil - case e := <-q.jobs: - q.wg.Add(1) - - return e - } -} - -// do singe job -func (q *queue) do(h jobs.Handler, e *entry) { - err := h(e.id, e.job) - - if err == nil { - atomic.AddInt64(&q.state.Queue, ^int64(0)) - return - } - - q.errHandler(e.id, e.job, err) - - if !e.job.Options.CanRetry(e.attempt) { - atomic.AddInt64(&q.state.Queue, ^int64(0)) - return - } - - q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) -} - -// stop the queue consuming -func (q *queue) stop() { - if atomic.LoadInt32(&q.on) == 0 { - return - } - - close(q.wait) - - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() - - atomic.StoreInt32(&q.on, 0) -} - -// add job to the queue -func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) { - if delay == 0 { - atomic.AddInt64(&q.state.Queue, 1) - go func() { - q.jobs <- &entry{id: id, job: j, attempt: attempt} - }() - - return - } - - atomic.AddInt64(&q.state.Delayed, 1) - go func() { - time.Sleep(delay) - atomic.AddInt64(&q.state.Delayed, ^int64(0)) - atomic.AddInt64(&q.state.Queue, 1) - - q.jobs <- &entry{id: id, job: j, attempt: attempt} - }() -} - -func (q *queue) stat() *jobs.Stat { - return &jobs.Stat{ - InternalName: ":memory:", - Queue: atomic.LoadInt64(&q.state.Queue), - Active: atomic.LoadInt64(&q.state.Active), - Delayed: atomic.LoadInt64(&q.state.Delayed), - } -} diff --git a/plugins/jobs/oooold/broker/ephemeral/stat_test.go b/plugins/jobs/oooold/broker/ephemeral/stat_test.go deleted file mode 100644 index 0894323c..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/stat_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Active) - - close(waitJob) - return nil - } - - <-waitJob - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(0), stat.Active) -} diff --git a/plugins/jobs/oooold/broker/sqs/broker.go b/plugins/jobs/oooold/broker/sqs/broker.go deleted file mode 100644 index 8cc62b6b..00000000 --- a/plugins/jobs/oooold/broker/sqs/broker.go +++ /dev/null @@ -1,189 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker represents SQS broker. -type Broker struct { - cfg *Config - sqs *sqs.SQS - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures SQS broker. -func (b *Broker) Init(cfg *Config) (ok bool, err error) { - b.cfg = cfg - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - q, err := newQueue(pipe, b.throw) - if err != nil { - return err - } - - b.queues[pipe] = q - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - b.sqs, err = b.cfg.SQS() - if err != nil { - return err - } - - for _, q := range b.queues { - q.url, err = q.declareQueue(b.sqs) - if err != nil { - return err - } - } - - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve(b.sqs, b.cfg.TimeoutDuration()) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, q := range b.queues { - q.stop() - } - - b.wait <- nil - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.sqs != nil && q.execPool != nil { - go q.serve(b.sqs, b.cfg.TimeoutDuration()) - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - if j.Options.Delay > 900 || j.Options.RetryDelay > 900 { - return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name()) - } - - return q.send(b.sqs, j) -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - return q.stat(b.sqs) -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/sqs/broker_test.go b/plugins/jobs/oooold/broker/sqs/broker_test.go deleted file mode 100644 index c87b302d..00000000 --- a/plugins/jobs/oooold/broker/sqs/broker_test.go +++ /dev/null @@ -1,275 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - "declare": map[string]interface{}{ - "MessageRetentionPeriod": 86400, - }, - } - - cfg = &Config{ - Key: "api-key", - Secret: "api-secret", - Region: "us-west-1", - Endpoint: "http://localhost:9324", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_RegisterInvalid(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "sqs", - "name": "default", - })) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - b.Consume(pipe, exec, errf) - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) { - pipe := &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "invalid", - "declare": map[string]interface{}{ - "VisibilityTimeout": "invalid", - }, - } - - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - b.Consume(pipe, exec, errf) - - assert.Error(t, b.Serve()) -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/sqs/config.go b/plugins/jobs/oooold/broker/sqs/config.go deleted file mode 100644 index d0c2f2b2..00000000 --- a/plugins/jobs/oooold/broker/sqs/config.go +++ /dev/null @@ -1,82 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/roadrunner/service" - "time" -) - -// Config defines sqs broker configuration. -type Config struct { - // Region defined SQS region, not required when endpoint is not empty. - Region string - - // Region defined AWS API key, not required when endpoint is not empty. - Key string - - // Region defined AWS API secret, not required when endpoint is not empty. - Secret string - - // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development. - Endpoint string - - // Timeout to allocate the connection. Default 10 seconds. - Timeout int -} - -// Hydrate config values. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - if c.Region == "" { - return fmt.Errorf("SQS region is missing") - } - - if c.Key == "" { - return fmt.Errorf("SQS key is missing") - } - - if c.Secret == "" { - return fmt.Errorf("SQS secret is missing") - } - - return nil -} - -// TimeoutDuration returns number of seconds allowed to allocate the connection. -func (c *Config) TimeoutDuration() time.Duration { - timeout := c.Timeout - if timeout == 0 { - timeout = 10 - } - - return time.Duration(timeout) * time.Second -} - -// Session returns new AWS session. -func (c *Config) Session() (*session.Session, error) { - return session.NewSession(&aws.Config{ - Region: aws.String(c.Region), - Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""), - }) -} - -// SQS returns new SQS instance or error. -func (c *Config) SQS() (*sqs.SQS, error) { - sess, err := c.Session() - if err != nil { - return nil, err - } - - if c.Endpoint == "" { - return sqs.New(sess), nil - } - - return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil -} diff --git a/plugins/jobs/oooold/broker/sqs/config_test.go b/plugins/jobs/oooold/broker/sqs/config_test.go deleted file mode 100644 index b36b3c6f..00000000 --- a/plugins/jobs/oooold/broker/sqs/config_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package sqs - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_Config_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{`{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error3(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1"}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error4(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error5(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} diff --git a/plugins/jobs/oooold/broker/sqs/consume_test.go b/plugins/jobs/oooold/broker/sqs/consume_test.go deleted file mode 100644 index 434fc6ea..00000000 --- a/plugins/jobs/oooold/broker/sqs/consume_test.go +++ /dev/null @@ -1,370 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) { - pipe := &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - } - - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_PushTooBigDelay(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{ - Delay: 901, - }, - }) - - assert.Error(t, perr) -} - -func TestBroker_Consume_PushTooBigDelay2(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{ - RetryDelay: 901, - }, - }) - - assert.Error(t, perr) -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed > time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled - b.Stop() -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/sqs/durability_test.go b/plugins/jobs/oooold/broker/sqs/durability_test.go deleted file mode 100644 index 58ddf4b9..00000000 --- a/plugins/jobs/oooold/broker/sqs/durability_test.go +++ /dev/null @@ -1,588 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Key: "api-key", - Secret: "api-secret", - Region: "us-west-1", - Endpoint: "http://localhost:9325", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:9325", - upstream: "localhost:9324", - accept: true, - } - - proxyPipe = &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - "lockReserved": 1, - "declare": map[string]interface{}{ - "MessageRetentionPeriod": 86400, - }, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - // expect 2 connections - proxy.waitConn(1) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(1) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - proxy.reset(false) - - _, serr = b.Stat(proxyPipe) - assert.Error(t, serr) - - proxy.reset(true) - - _, serr = b.Stat(proxyPipe) - assert.NoError(t, serr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1) - - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(3), st.Queue+st.Active) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - if j.Payload == "kill" { - proxy.reset(true) - } - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 3 { - break - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(1).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/sqs/job.go b/plugins/jobs/oooold/broker/sqs/job.go deleted file mode 100644 index 50e2c164..00000000 --- a/plugins/jobs/oooold/broker/sqs/job.go +++ /dev/null @@ -1,80 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "strconv" - "time" -) - -var jobAttributes = []*string{ - aws.String("rr-job"), - aws.String("rr-maxAttempts"), - aws.String("rr-delay"), - aws.String("rr-timeout"), - aws.String("rr-retryDelay"), -} - -// pack job metadata into headers -func pack(url *string, j *jobs.Job) *sqs.SendMessageInput { - return &sqs.SendMessageInput{ - QueueUrl: url, - DelaySeconds: aws.Int64(int64(j.Options.Delay)), - MessageBody: aws.String(j.Payload), - MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)}, - "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)}, - "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())}, - "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())}, - "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())}, - }, - } -} - -// unpack restores jobs.Options -func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) { - if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok { - return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount") - } - attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"]) - - for _, attr := range jobAttributes { - if _, ok := msg.MessageAttributes[*attr]; !ok { - return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr) - } - } - - j = &jobs.Job{ - Job: *msg.MessageAttributes["rr-job"].StringValue, - Payload: *msg.Body, - Options: &jobs.Options{}, - } - - if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil { - j.Options.Delay = delay - } - - if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil { - j.Options.Attempts = maxAttempts - } - - if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil { - j.Options.Timeout = timeout - } - - if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil { - j.Options.RetryDelay = retryDelay - } - - return *msg.MessageId, attempt - 1, j, nil -} - -func awsString(n int) *string { - return aws.String(strconv.Itoa(n)) -} - -func awsDuration(d time.Duration) *string { - return aws.String(strconv.Itoa(int(d.Seconds()))) -} diff --git a/plugins/jobs/oooold/broker/sqs/job_test.go b/plugins/jobs/oooold/broker/sqs/job_test.go deleted file mode 100644 index a120af53..00000000 --- a/plugins/jobs/oooold/broker/sqs/job_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package sqs - -import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_Unpack(t *testing.T) { - msg := &sqs.Message{ - Body: aws.String("body"), - Attributes: map[string]*string{}, - MessageAttributes: map[string]*sqs.MessageAttributeValue{}, - } - - _, _, _, err := unpack(msg) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/sqs/queue.go b/plugins/jobs/oooold/broker/sqs/queue.go deleted file mode 100644 index 8a92448e..00000000 --- a/plugins/jobs/oooold/broker/sqs/queue.go +++ /dev/null @@ -1,266 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "strconv" - "sync" - "sync/atomic" - "time" -) - -type queue struct { - active int32 - pipe *jobs.Pipeline - url *string - reserve time.Duration - lockReserved time.Duration - - // queue events - lsn func(event int, ctx interface{}) - - // stop channel - wait chan interface{} - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { - if pipe.String("queue", "") == "" { - return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name()) - } - - return &queue{ - pipe: pipe, - reserve: pipe.Duration("reserve", time.Second), - lockReserved: pipe.Duration("lockReserved", 300*time.Second), - lsn: lsn, - }, nil -} - -// declareQueue declared queue -func (q *queue) declareQueue(s *sqs.SQS) (*string, error) { - attr := make(map[string]*string) - for k, v := range q.pipe.Map("declare") { - if vs, ok := v.(string); ok { - attr[k] = aws.String(vs) - } - - if vi, ok := v.(int); ok { - attr[k] = aws.String(strconv.Itoa(vi)) - } - - if vb, ok := v.(bool); ok { - if vb { - attr[k] = aws.String("true") - } else { - attr[k] = aws.String("false") - } - } - } - - if len(attr) != 0 { - r, err := s.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(q.pipe.String("queue", "")), - Attributes: attr, - }) - - return r.QueueUrl, err - } - - // no need to create (get existed) - r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))}) - if err != nil { - return nil, err - } - - return r.QueueUrl, nil -} - -// serve consumers -func (q *queue) serve(s *sqs.SQS, tout time.Duration) { - q.wait = make(chan interface{}) - atomic.StoreInt32(&q.active, 1) - - var errored bool - for { - messages, stop, err := q.consume(s) - if err != nil { - if errored { - // reoccurring error - time.Sleep(tout) - } else { - errored = true - q.report(err) - } - - continue - } - errored = false - - if stop { - return - } - - for _, msg := range messages { - h := <-q.execPool - go func(h jobs.Handler, msg *sqs.Message) { - err := q.do(s, h, msg) - q.execPool <- h - q.wg.Done() - q.report(err) - }(h, msg) - } - } -} - -// consume and allocate connection. -func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) { - q.muw.Lock() - defer q.muw.Unlock() - - select { - case <-q.wait: - return nil, true, nil - default: - r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{ - QueueUrl: q.url, - MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))), - WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())), - VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())), - AttributeNames: []*string{aws.String("ApproximateReceiveCount")}, - MessageAttributeNames: jobAttributes, - }) - if err != nil { - return nil, false, err - } - - q.wg.Add(len(r.Messages)) - - return r.Messages, false, nil - } -} - -// do single message -func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) { - id, attempt, j, err := unpack(msg) - if err != nil { - go q.deleteMessage(s, msg, err) - return err - } - - // block the job based on known timeout - _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - QueueUrl: q.url, - ReceiptHandle: msg.ReceiptHandle, - VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())), - }) - if err != nil { - go q.deleteMessage(s, msg, err) - return err - } - - err = h(id, j) - if err == nil { - return q.deleteMessage(s, msg, nil) - } - - q.errHandler(id, j, err) - - if !j.Options.CanRetry(attempt) { - return q.deleteMessage(s, msg, err) - } - - // retry after specified duration - _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - QueueUrl: q.url, - ReceiptHandle: msg.ReceiptHandle, - VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)), - }) - - return err -} - -func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error { - _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle}) - return drr -} - -// stop the queue consuming -func (q *queue) stop() { - if atomic.LoadInt32(&q.active) == 0 { - return - } - - atomic.StoreInt32(&q.active, 0) - - close(q.wait) - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() -} - -// add job to the queue -func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) { - r, err := s.SendMessage(pack(q.url, j)) - if err != nil { - return "", err - } - - return *r.MessageId, nil -} - -// return queue stats -func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) { - r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{ - QueueUrl: q.url, - AttributeNames: []*string{ - aws.String("ApproximateNumberOfMessages"), - aws.String("ApproximateNumberOfMessagesDelayed"), - aws.String("ApproximateNumberOfMessagesNotVisible"), - }, - }) - - if err != nil { - return nil, err - } - - stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")} - - for a, v := range r.Attributes { - if a == "ApproximateNumberOfMessages" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Queue = int64(v) - } - } - - if a == "ApproximateNumberOfMessagesNotVisible" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Active = int64(v) - } - } - - if a == "ApproximateNumberOfMessagesDelayed" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Delayed = int64(v) - } - } - } - - return stat, nil -} - -// throw handles service, server and pool events. -func (q *queue) report(err error) { - if err != nil { - q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/sqs/stat_test.go b/plugins/jobs/oooold/broker/sqs/stat_test.go deleted file mode 100644 index 5031571b..00000000 --- a/plugins/jobs/oooold/broker/sqs/stat_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - // unable to use approximated stats - _, err = b.Stat(pipe) - assert.NoError(t, err) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - _, err := b.Stat(pipe) - assert.NoError(t, err) - - close(waitJob) - return nil - } - - <-waitJob - _, err = b.Stat(pipe) - assert.NoError(t, err) -} |