summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-22 11:44:22 +0300
committerValery Piashchynski <[email protected]>2021-06-22 11:44:22 +0300
commit1a2a1f4735e40675abf6cd9767c99374359ec2bb (patch)
tree5abedf7306b50b02ba3892c0bc562307a62eb332 /plugins/jobs/oooold/broker
parent260d69c21fba6d763d05dc5693689ddf7ce7bfe2 (diff)
- Remove all old code, reformat, fix linters, return GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker')
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker.go216
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker_test.go419
-rw-r--r--plugins/jobs/oooold/broker/amqp/conn.go232
-rw-r--r--plugins/jobs/oooold/broker/amqp/consume_test.go258
-rw-r--r--plugins/jobs/oooold/broker/amqp/durability_test.go728
-rw-r--r--plugins/jobs/oooold/broker/amqp/job.go56
-rw-r--r--plugins/jobs/oooold/broker/amqp/job_test.go29
-rw-r--r--plugins/jobs/oooold/broker/amqp/queue.go302
-rw-r--r--plugins/jobs/oooold/broker/amqp/stat_test.go63
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/broker.go185
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/broker_test.go276
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/config.go50
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/config_test.go47
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/conn.go180
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/constants.go6
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/consume_test.go242
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/durability_test.go575
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/job.go24
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/sock.bean0
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/stat_test.go66
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube.go250
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube_test.go18
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/broker.go174
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/broker_test.go221
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/consume_test.go253
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/queue.go161
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/stat_test.go64
-rw-r--r--plugins/jobs/oooold/broker/sqs/broker.go189
-rw-r--r--plugins/jobs/oooold/broker/sqs/broker_test.go275
-rw-r--r--plugins/jobs/oooold/broker/sqs/config.go82
-rw-r--r--plugins/jobs/oooold/broker/sqs/config_test.go48
-rw-r--r--plugins/jobs/oooold/broker/sqs/consume_test.go370
-rw-r--r--plugins/jobs/oooold/broker/sqs/durability_test.go588
-rw-r--r--plugins/jobs/oooold/broker/sqs/job.go80
-rw-r--r--plugins/jobs/oooold/broker/sqs/job_test.go19
-rw-r--r--plugins/jobs/oooold/broker/sqs/queue.go266
-rw-r--r--plugins/jobs/oooold/broker/sqs/stat_test.go60
37 files changed, 0 insertions, 7072 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go
deleted file mode 100644
index b47d83ee..00000000
--- a/plugins/jobs/oooold/broker/amqp/broker.go
+++ /dev/null
@@ -1,216 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "github.com/spiral/jobs/v2"
- "sync"
- "sync/atomic"
-)
-
-// Broker represents AMQP broker.
-type Broker struct {
- cfg *Config
- lsn func(event int, ctx interface{})
- publish *chanPool
- consume *chanPool
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures AMQP job broker (always 2 connections).
-func (b *Broker) Init(cfg *Config) (ok bool, err error) {
- b.cfg = cfg
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- q, err := newQueue(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.queues[pipe] = q
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
- b.mu.Unlock()
- return err
- }
- defer b.publish.Close()
-
- if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
- b.mu.Unlock()
- return err
- }
- defer b.consume.Close()
-
- for _, q := range b.queues {
- err := q.declare(b.publish, q.name, q.key, nil)
- if err != nil {
- b.mu.Unlock()
- return err
- }
- }
-
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve(b.publish, b.consume)
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.publish != nil && q.execPool != nil {
- if q.execPool != nil {
- go q.serve(b.publish, b.consume)
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil {
- return "", err
- }
-
- return id.String(), nil
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- queue, err := q.inspect(b.publish)
- if err != nil {
- return nil, err
- }
-
- // this the closest approximation we can get for now
- return &jobs.Stat{
- InternalName: queue.Name,
- Queue: int64(queue.Messages),
- Active: int64(atomic.LoadInt32(&q.running)),
- }, nil
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go
deleted file mode 100644
index 66078099..00000000
--- a/plugins/jobs/oooold/broker/amqp/broker_test.go
+++ /dev/null
@@ -1,419 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "amqp",
- "name": "default",
- "queue": "rr-queue",
- "exchange": "rr-exchange",
- "prefetch": 1,
- }
-
- cfg = &Config{
- Addr: "amqp://guest:guest@localhost:5672/",
- }
-)
-
-var (
- fanoutPipe = &jobs.Pipeline{
- "broker": "amqp",
- "name": "fanout",
- "queue": "fanout-queue",
- "exchange": "fanout-exchange",
- "exchange-type": "fanout",
- "prefetch": 1,
- }
-
- fanoutCfg = &Config{
- Addr: "amqp://guest:guest@localhost:5672/",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_BadPipeline(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "amqp",
- "name": "default",
- "exchange": "rr-exchange",
- "prefetch": 1,
- }))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_CantStart(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(&Config{
- Addr: "amqp://guest:guest@localhost:15672/",
- })
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal()
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_Queue_RoutingKey(t *testing.T) {
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key")
-}
-
-func TestBroker_Register_With_RoutingKey(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- assert.NoError(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Consume_With_RoutingKey(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- err = b.Register(&pipeWithKey)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(&pipeWithKey, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Queue_ExchangeType(t *testing.T) {
- pipeWithKey := pipe.With("exchange-type", "direct")
-
- assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct")
-}
-
-func TestBroker_Register_With_ExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("exchange-type", "fanout")
-
- assert.NoError(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Register_With_WrongExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("exchange-type", "xxx")
-
- assert.Error(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Consume_With_ExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(fanoutCfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := fanoutPipe.With("exchange-type", "fanout")
-
- err = b.Register(&pipeWithKey)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(&pipeWithKey, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go
deleted file mode 100644
index be747776..00000000
--- a/plugins/jobs/oooold/broker/amqp/conn.go
+++ /dev/null
@@ -1,232 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/cenkalti/backoff/v4"
- "github.com/streadway/amqp"
- "sync"
- "time"
-)
-
-// manages set of AMQP channels
-type chanPool struct {
- // timeout to backoff redial
- tout time.Duration
- url string
-
- mu *sync.Mutex
-
- conn *amqp.Connection
- channels map[string]*channel
- wait chan interface{}
- connected chan interface{}
-}
-
-// manages single channel
-type channel struct {
- ch *amqp.Channel
- // todo unused
- //consumer string
- confirm chan amqp.Confirmation
- signal chan error
-}
-
-// newConn creates new watched AMQP connection
-func newConn(url string, tout time.Duration) (*chanPool, error) {
- conn, err := dial(url)
- if err != nil {
- return nil, err
- }
-
- cp := &chanPool{
- url: url,
- tout: tout,
- conn: conn,
- mu: &sync.Mutex{},
- channels: make(map[string]*channel),
- wait: make(chan interface{}),
- connected: make(chan interface{}),
- }
-
- close(cp.connected)
- go cp.watch()
- return cp, nil
-}
-
-// dial dials to AMQP.
-func dial(url string) (*amqp.Connection, error) {
- return amqp.Dial(url)
-}
-
-// Close gracefully closes all underlying channels and connection.
-func (cp *chanPool) Close() error {
- cp.mu.Lock()
-
- close(cp.wait)
- if cp.channels == nil {
- return fmt.Errorf("connection is dead")
- }
-
- // close all channels and consume
- var wg sync.WaitGroup
- for _, ch := range cp.channels {
- wg.Add(1)
-
- go func(ch *channel) {
- defer wg.Done()
- cp.closeChan(ch, nil)
- }(ch)
- }
- cp.mu.Unlock()
-
- wg.Wait()
-
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- if cp.conn != nil {
- return cp.conn.Close()
- }
-
- return nil
-}
-
-// waitConnected waits till connection is connected again or eventually closed.
-// must only be invoked after connection error has been delivered to channel.signal.
-func (cp *chanPool) waitConnected() chan interface{} {
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- return cp.connected
-}
-
-// watch manages connection state and reconnects if needed
-func (cp *chanPool) watch() {
- for {
- select {
- case <-cp.wait:
- // connection has been closed
- return
- // here we are waiting for the errors from amqp connection
- case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)):
- cp.mu.Lock()
- // clear connected, since connections are dead
- cp.connected = make(chan interface{})
-
- // broadcast error to all consume to let them for the tryReconnect
- for _, ch := range cp.channels {
- ch.signal <- err
- }
-
- // disable channel allocation while server is dead
- cp.conn = nil
- cp.channels = nil
-
- // initialize the backoff
- expb := backoff.NewExponentialBackOff()
- expb.MaxInterval = cp.tout
- cp.mu.Unlock()
-
- // reconnect function
- reconnect := func() error {
- cp.mu.Lock()
- conn, err := dial(cp.url)
- if err != nil {
- // still failing
- fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error()))
- cp.mu.Unlock()
- return err
- }
-
- // TODO ADD LOGGING
- fmt.Println("------amqp successfully redialed------")
-
- // here we are reconnected
- // replace the connection
- cp.conn = conn
- // re-init the channels
- cp.channels = make(map[string]*channel)
- cp.mu.Unlock()
- return nil
- }
-
- // start backoff retry
- errb := backoff.Retry(reconnect, expb)
- if errb != nil {
- fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error()))
- // reconnection failed
- close(cp.connected)
- return
- }
- close(cp.connected)
- }
- }
-}
-
-// channel allocates new channel on amqp connection
-func (cp *chanPool) channel(name string) (*channel, error) {
- cp.mu.Lock()
- dead := cp.conn == nil
- cp.mu.Unlock()
-
- if dead {
- // wait for connection restoration (doubled the timeout duration)
- select {
- case <-time.NewTimer(cp.tout * 2).C:
- return nil, fmt.Errorf("connection is dead")
- case <-cp.connected:
- // connected
- }
- }
-
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- if cp.conn == nil {
- return nil, fmt.Errorf("connection has been closed")
- }
-
- if ch, ok := cp.channels[name]; ok {
- return ch, nil
- }
-
- // we must create new channel
- ch, err := cp.conn.Channel()
- if err != nil {
- return nil, err
- }
-
- // Enable publish confirmations
- if err = ch.Confirm(false); err != nil {
- return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err)
- }
-
- // we expect that every allocated channel would have listener on signal
- // this is not true only in case of pure producing channels
- cp.channels[name] = &channel{
- ch: ch,
- confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)),
- signal: make(chan error, 1),
- }
-
- return cp.channels[name], nil
-}
-
-// closeChan gracefully closes and removes channel allocation.
-func (cp *chanPool) closeChan(c *channel, err error) error {
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- go func() {
- c.signal <- nil
- c.ch.Close()
- }()
-
- for name, ch := range cp.channels {
- if ch == c {
- delete(cp.channels, name)
- }
- }
-
- return err
-}
diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go
deleted file mode 100644
index 28999c36..00000000
--- a/plugins/jobs/oooold/broker/amqp/consume_test.go
+++ /dev/null
@@ -1,258 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- start := time.Now()
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed >= time.Second)
- assert.True(t, elapsed < 3*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go
deleted file mode 100644
index 00d62c51..00000000
--- a/plugins/jobs/oooold/broker/amqp/durability_test.go
+++ /dev/null
@@ -1,728 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Addr: "amqp://guest:guest@localhost:5673/",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:5673",
- upstream: "localhost:5672",
- accept: true,
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- // expect 2 connections
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NotEqual(t, "0", jid)
-
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- proxy.reset(true)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2_2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // start when connection is dead
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- proxy.reset(false)
-
- _, serr := b.Stat(pipe)
- assert.Error(t, serr)
-
- proxy.reset(true)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2)
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
-
- if j.Payload == "kill" && len(done) == 0 {
- proxy.reset(true)
- }
-
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 3 {
- break
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go
deleted file mode 100644
index bd559715..00000000
--- a/plugins/jobs/oooold/broker/amqp/job.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/streadway/amqp"
-)
-
-// pack job metadata into headers
-func pack(id string, attempt int, j *jobs.Job) amqp.Table {
- return amqp.Table{
- "rr-id": id,
- "rr-job": j.Job,
- "rr-attempt": int64(attempt),
- "rr-maxAttempts": int64(j.Options.Attempts),
- "rr-timeout": int64(j.Options.Timeout),
- "rr-delay": int64(j.Options.Delay),
- "rr-retryDelay": int64(j.Options.RetryDelay),
- }
-}
-
-// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) {
- j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}}
-
- if _, ok := d.Headers["rr-id"].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id")
- }
-
- if _, ok := d.Headers["rr-attempt"].(int64); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt")
- }
-
- if _, ok := d.Headers["rr-job"].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job")
- }
- j.Job = d.Headers["rr-job"].(string)
-
- if _, ok := d.Headers["rr-maxAttempts"].(int64); ok {
- j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64))
- }
-
- if _, ok := d.Headers["rr-timeout"].(int64); ok {
- j.Options.Timeout = int(d.Headers["rr-timeout"].(int64))
- }
-
- if _, ok := d.Headers["rr-delay"].(int64); ok {
- j.Options.Delay = int(d.Headers["rr-delay"].(int64))
- }
-
- if _, ok := d.Headers["rr-retryDelay"].(int64); ok {
- j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64))
- }
-
- return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil
-}
diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go
deleted file mode 100644
index 24ca453b..00000000
--- a/plugins/jobs/oooold/broker/amqp/job_test.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package amqp
-
-import (
- "github.com/streadway/amqp"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func Test_Unpack_Errors(t *testing.T) {
- _, _, _, err := unpack(amqp.Delivery{
- Headers: map[string]interface{}{},
- })
- assert.Error(t, err)
-
- _, _, _, err = unpack(amqp.Delivery{
- Headers: map[string]interface{}{
- "rr-id": "id",
- },
- })
- assert.Error(t, err)
-
- _, _, _, err = unpack(amqp.Delivery{
- Headers: map[string]interface{}{
- "rr-id": "id",
- "rr-attempt": int64(0),
- },
- })
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go
deleted file mode 100644
index 6ef5f20f..00000000
--- a/plugins/jobs/oooold/broker/amqp/queue.go
+++ /dev/null
@@ -1,302 +0,0 @@
-package amqp
-
-import (
- "errors"
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/streadway/amqp"
- "os"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type ExchangeType string
-
-const (
- Direct ExchangeType = "direct"
- Fanout ExchangeType = "fanout"
- Topic ExchangeType = "topic"
- Headers ExchangeType = "headers"
-)
-
-func (et ExchangeType) IsValid() error {
- switch et {
- case Direct, Fanout, Topic, Headers:
- return nil
- }
- return errors.New("unknown exchange-type")
-}
-
-func (et ExchangeType) String() string {
- switch et {
- case Direct, Fanout, Topic, Headers:
- return string(et)
- default:
- return "direct"
- }
-}
-
-
-type queue struct {
- active int32
- pipe *jobs.Pipeline
- exchange string
- exchangeType ExchangeType
- name, key string
- consumer string
-
- // active consuming channel
- muc sync.Mutex
- cc *channel
-
- // queue events
- lsn func(event int, ctx interface{})
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- running int32
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-// newQueue creates new queue wrapper for AMQP.
-func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
- if pipe.String("queue", "") == "" {
- return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline")
- }
-
- exchangeType := ExchangeType(pipe.String("exchange-type", "direct"))
-
- err := exchangeType.IsValid()
- if err != nil {
- return nil, fmt.Errorf(err.Error())
- }
-
- return &queue{
- exchange: pipe.String("exchange", "amqp.direct"),
- exchangeType: exchangeType,
- name: pipe.String("queue", ""),
- key: pipe.String("routing-key", pipe.String("queue", "")),
- consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())),
- pipe: pipe,
- lsn: lsn,
- }, nil
-}
-
-// serve consumes queue
-func (q *queue) serve(publish, consume *chanPool) {
- atomic.StoreInt32(&q.active, 1)
-
- for {
- <-consume.waitConnected()
- if atomic.LoadInt32(&q.active) == 0 {
- // stopped
- return
- }
-
- delivery, cc, err := q.consume(consume)
- if err != nil {
- q.report(err)
- continue
- }
-
- q.muc.Lock()
- q.cc = cc
- q.muc.Unlock()
-
- for d := range delivery {
- q.muw.Lock()
- q.wg.Add(1)
- q.muw.Unlock()
-
- atomic.AddInt32(&q.running, 1)
- h := <-q.execPool
-
- go func(h jobs.Handler, d amqp.Delivery) {
- err := q.do(publish, h, d)
-
- atomic.AddInt32(&q.running, ^int32(0))
- q.execPool <- h
- q.wg.Done()
- q.report(err)
- }(h, d)
- }
- }
-}
-
-func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) {
- // allocate channel for the consuming
- if cc, err = consume.channel(q.name); err != nil {
- return nil, nil, err
- }
-
- if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil {
- return nil, nil, consume.closeChan(cc, err)
- }
-
- delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil)
- if err != nil {
- return nil, nil, consume.closeChan(cc, err)
- }
-
- // do i like it?
- go func(consume *chanPool) {
- for err := range cc.signal {
- consume.closeChan(cc, err)
- return
- }
- }(consume)
-
- return delivery, cc, err
-}
-
-func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error {
- id, attempt, j, err := unpack(d)
- if err != nil {
- q.report(err)
- return d.Nack(false, false)
- }
- err = h(id, j)
-
- if err == nil {
- return d.Ack(false)
- }
-
- // failed
- q.errHandler(id, j, err)
-
- if !j.Options.CanRetry(attempt) {
- return d.Nack(false, false)
- }
-
- // retry as new j (to accommodate attempt number and new delay)
- if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil {
- q.report(err)
- return d.Nack(false, true)
- }
-
- return d.Ack(false)
-}
-
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&q.active, 0)
-
- q.muc.Lock()
- if q.cc != nil {
- // gracefully stopped consuming
- q.report(q.cc.ch.Cancel(q.consumer, true))
- }
- q.muc.Unlock()
-
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-}
-
-// publish message to queue or to delayed queue.
-func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error {
- c, err := cp.channel(q.name)
- if err != nil {
- return err
- }
-
- qKey := q.key
-
- if delay != 0 {
- delayMs := int64(delay.Seconds() * 1000)
- qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name)
- qKey = qName
-
- err := q.declare(cp, qName, qName, amqp.Table{
- "x-dead-letter-exchange": q.exchange,
- "x-dead-letter-routing-key": q.name,
- "x-message-ttl": delayMs,
- "x-expires": delayMs * 2,
- })
-
- if err != nil {
- return err
- }
- }
-
- err = c.ch.Publish(
- q.exchange, // exchange
- qKey, // routing key
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- ContentType: "application/octet-stream",
- Body: j.Body(),
- DeliveryMode: amqp.Persistent,
- Headers: pack(id, attempt, j),
- },
- )
-
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- confirmed, ok := <-c.confirm
- if ok && confirmed.Ack {
- return nil
- }
-
- return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag)
-}
-
-// declare queue and binding to it
-func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error {
- c, err := cp.channel(q.name)
- if err != nil {
- return err
- }
-
- err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- _, err = c.ch.QueueDeclare(queue, true, false, false, false, args)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- err = c.ch.QueueBind(queue, key, q.exchange, false, nil)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- // keep channel open
- return err
-}
-
-// inspect the queue
-func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) {
- c, err := cp.channel("stat")
- if err != nil {
- return nil, err
- }
-
- queue, err := c.ch.QueueInspect(q.name)
- if err != nil {
- return nil, cp.closeChan(c, err)
- }
-
- // keep channel open
- return &queue, err
-}
-
-// throw handles service, server and pool events.
-func (q *queue) report(err error) {
- if err != nil {
- q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go
deleted file mode 100644
index ef19746c..00000000
--- a/plugins/jobs/oooold/broker/amqp/stat_test.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "sync"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
- exec <- func(id string, j *jobs.Job) error {
- defer wg.Done()
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Active)
-
- return nil
- }
-
- wg.Wait()
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go
deleted file mode 100644
index dc3ea518..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/broker.go
+++ /dev/null
@@ -1,185 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker run consume using Broker service.
-type Broker struct {
- cfg *Config
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- conn *conn
- tubes map[*jobs.Pipeline]*tube
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures broker.
-func (b *Broker) Init(cfg *Config) (bool, error) {
- b.cfg = cfg
- b.tubes = make(map[*jobs.Pipeline]*tube)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.tubes[pipe]; ok {
- return fmt.Errorf("tube `%s` has already been registered", pipe.Name())
- }
-
- t, err := newTube(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.tubes[pipe] = t
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- if b.conn, err = b.cfg.newConn(); err != nil {
- return err
- }
- defer b.conn.Close()
-
- for _, t := range b.tubes {
- tt := t
- if tt.execPool != nil {
- go tt.serve(b.cfg)
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, t := range b.tubes {
- t.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- t, ok := b.tubes[pipe]
- if !ok {
- return fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- t.stop()
-
- t.execPool = execPool
- t.errHandler = errHandler
-
- if b.conn != nil {
- tt := t
- if tt.execPool != nil {
- go tt.serve(connFactory(b.cfg))
- }
- }
-
- return nil
-}
-
-// Push data into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- t := b.tube(pipe)
- if t == nil {
- return "", fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- data, err := pack(j)
- if err != nil {
- return "", err
- }
-
- return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration())
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- t := b.tube(pipe)
- if t == nil {
- return nil, fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- return t.stat(b.conn)
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) tube(pipe *jobs.Pipeline) *tube {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- t, ok := b.tubes[pipe]
- if !ok {
- return nil
- }
-
- return t
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go
deleted file mode 100644
index cd2132af..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/broker_test.go
+++ /dev/null
@@ -1,276 +0,0 @@
-package beanstalk
-
-import (
- "github.com/beanstalkd/go-beanstalk"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "beanstalk",
- "name": "default",
- "tube": "test",
- }
-
- cfg = &Config{
- Addr: "tcp://localhost:11300",
- }
-)
-
-func init() {
- conn, err := beanstalk.Dial("tcp", "localhost:11300")
- if err != nil {
- panic(err)
- }
- defer conn.Close()
-
- t := beanstalk.Tube{Name: "testTube", Conn: conn}
-
- for {
- id, _, err := t.PeekReady()
- if id == 0 || err != nil {
- break
- }
-
- if err := conn.Delete(id); err != nil {
- panic(err)
- }
- }
-}
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Invalid(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "beanstalk",
- "name": "default",
- }))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Error(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(&Config{
- Addr: "tcp://localhost:11399",
- })
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go
deleted file mode 100644
index 3e48a2d7..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/config.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/roadrunner/service"
- "strings"
- "time"
-)
-
-// Config defines beanstalk broker configuration.
-type Config struct {
- // Addr of beanstalk server.
- Addr string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Addr == "" {
- return fmt.Errorf("beanstalk address is missing")
- }
-
- return nil
-}
-
-// TimeoutDuration returns number of seconds allowed to allocate the connection.
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
-
-// size creates new rpc socket Listener.
-func (c *Config) newConn() (*conn, error) {
- dsn := strings.Split(c.Addr, "://")
- if len(dsn) != 2 {
- return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)")
- }
-
- return newConn(dsn[0], dsn[1], c.TimeoutDuration())
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go
deleted file mode 100644
index 4ba08a04..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/config_test.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package beanstalk
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func TestConfig_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func TestConfig_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{"addr":""}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func TestConfig_Hydrate_Error3(t *testing.T) {
- cfg := &mockCfg{`{"addr":"tcp"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- _, err := c.newConn()
- assert.Error(t, err)
-}
-
-func TestConfig_Hydrate_Error4(t *testing.T) {
- cfg := &mockCfg{`{"addr":"unix://sock.bean"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- _, err := c.newConn()
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go
deleted file mode 100644
index 7aba6bbb..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/conn.go
+++ /dev/null
@@ -1,180 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/beanstalkd/go-beanstalk"
- "github.com/cenkalti/backoff/v4"
- "strings"
- "sync"
- "time"
-)
-
-var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
-
-// creates new connections
-type connFactory interface {
- newConn() (*conn, error)
-}
-
-// conn protects allocation for one connection between
-// threads and provides reconnecting capabilities.
-type conn struct {
- tout time.Duration
- conn *beanstalk.Conn
- alive bool
- free chan interface{}
- dead chan interface{}
- stop chan interface{}
- lock *sync.Cond
-}
-
-// creates new beanstalk connection and reconnect watcher.
-func newConn(network, addr string, tout time.Duration) (cn *conn, err error) {
- cn = &conn{
- tout: tout,
- alive: true,
- free: make(chan interface{}, 1),
- dead: make(chan interface{}, 1),
- stop: make(chan interface{}),
- lock: sync.NewCond(&sync.Mutex{}),
- }
-
- cn.conn, err = beanstalk.Dial(network, addr)
- if err != nil {
- return nil, err
- }
-
- go cn.watch(network, addr)
-
- return cn, nil
-}
-
-// reset the connection and reconnect watcher.
-func (cn *conn) Close() error {
- cn.lock.L.Lock()
- defer cn.lock.L.Unlock()
-
- close(cn.stop)
- for cn.alive {
- cn.lock.Wait()
- }
-
- return nil
-}
-
-// acquire connection instance or return error in case of timeout. When mandratory set to true
-// timeout won't be applied.
-func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) {
- // do not apply timeout on mandatory connections
- if mandatory {
- select {
- case <-cn.stop:
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- return cn.conn, nil
- }
- }
-
- select {
- case <-cn.stop:
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- return cn.conn, nil
- default:
- // *2 to handle commands called right after the connection reset
- tout := time.NewTimer(cn.tout * 2)
- select {
- case <-cn.stop:
- tout.Stop()
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- tout.Stop()
- return cn.conn, nil
- case <-tout.C:
- return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout)
- }
- }
-}
-
-// release acquired connection.
-func (cn *conn) release(err error) error {
- if isConnError(err) {
- // reconnect is required
- cn.dead <- err
- } else {
- cn.free <- nil
- }
-
- return err
-}
-
-// watch and reconnect if dead
-func (cn *conn) watch(network, addr string) {
- cn.free <- nil
- t := time.NewTicker(WatchThrottleLimit)
- defer t.Stop()
- for {
- select {
- case <-cn.dead:
- // simple throttle limiter
- <-t.C
- // try to reconnect
- // TODO add logging here
- expb := backoff.NewExponentialBackOff()
- expb.MaxInterval = cn.tout
-
- reconnect := func() error {
- conn, err := beanstalk.Dial(network, addr)
- if err != nil {
- fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error()))
- return err
- }
-
- // TODO ADD LOGGING
- fmt.Println("------beanstalk successfully redialed------")
-
- cn.conn = conn
- cn.free <- nil
- return nil
- }
-
- err := backoff.Retry(reconnect, expb)
- if err != nil {
- fmt.Println(fmt.Sprintf("redial failed: %s", err.Error()))
- cn.dead <- nil
- }
-
- case <-cn.stop:
- cn.lock.L.Lock()
- select {
- case <-cn.dead:
- case <-cn.free:
- }
-
- // stop underlying connection
- cn.conn.Close()
- cn.alive = false
- cn.lock.Signal()
-
- cn.lock.L.Unlock()
-
- return
- }
- }
-}
-
-// isConnError indicates that error is related to dead socket.
-func isConnError(err error) bool {
- if err == nil {
- return false
- }
-
- for _, errStr := range connErrors {
- // golang...
- if strings.Contains(err.Error(), errStr) {
- return true
- }
- }
-
- return false
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go
deleted file mode 100644
index 84be305e..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/constants.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package beanstalk
-
-import "time"
-
-// WatchThrottleLimit is used to limit reconnection occurrence in watch function
-const WatchThrottleLimit = time.Second \ No newline at end of file
diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go
deleted file mode 100644
index b16866ae..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/consume_test.go
+++ /dev/null
@@ -1,242 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed >= time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go
deleted file mode 100644
index 499a5206..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/durability_test.go
+++ /dev/null
@@ -1,575 +0,0 @@
-package beanstalk
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Addr: "tcp://localhost:11301",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:11301",
- upstream: "localhost:11300",
- accept: true,
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- // expect 2 connections
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // reoccuring
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NotEqual(t, "0", jid)
-
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- proxy.reset(true)
-
- // auto-reconnect
- _, serr = b.Stat(pipe)
- assert.NoError(t, serr)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2)
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(3), st.Queue+st.Active)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- if j.Payload == "kill" {
- proxy.reset(true)
- }
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go
deleted file mode 100644
index fd9c8c3c..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/job.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "encoding/gob"
- "github.com/spiral/jobs/v2"
-)
-
-func pack(j *jobs.Job) ([]byte, error) {
- b := new(bytes.Buffer)
- err := gob.NewEncoder(b).Encode(j)
- if err != nil {
- return nil, err
- }
-
- return b.Bytes(), nil
-}
-
-func unpack(data []byte) (*jobs.Job, error) {
- j := &jobs.Job{}
- err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j)
-
- return j, err
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean
deleted file mode 100644
index e69de29b..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/sock.bean
+++ /dev/null
diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go
deleted file mode 100644
index 14a55859..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/stat_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package beanstalk
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- // beanstalk reserves job right after push
- time.Sleep(time.Millisecond * 100)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(1), stat.Active)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go
deleted file mode 100644
index 9d7ad117..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/tube.go
+++ /dev/null
@@ -1,250 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/beanstalkd/go-beanstalk"
- "github.com/spiral/jobs/v2"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type tube struct {
- active int32
- pipe *jobs.Pipeline
- mut sync.Mutex
- tube *beanstalk.Tube
- tubeSet *beanstalk.TubeSet
- reserve time.Duration
-
- // tube events
- lsn func(event int, ctx interface{})
-
- // stop channel
- wait chan interface{}
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-type entry struct {
- id uint64
- data []byte
-}
-
-func (e *entry) String() string {
- return fmt.Sprintf("%v", e.id)
-}
-
-// create new tube consumer and producer
-func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) {
- if pipe.String("tube", "") == "" {
- return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline")
- }
-
- return &tube{
- pipe: pipe,
- tube: &beanstalk.Tube{Name: pipe.String("tube", "")},
- tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")),
- reserve: pipe.Duration("reserve", time.Second),
- lsn: lsn,
- }, nil
-}
-
-// run consumers
-func (t *tube) serve(connector connFactory) {
- // tube specific consume connection
- cn, err := connector.newConn()
- if err != nil {
- t.report(err)
- return
- }
- defer cn.Close()
-
- t.wait = make(chan interface{})
- atomic.StoreInt32(&t.active, 1)
-
- for {
- e, err := t.consume(cn)
- if err != nil {
- if isConnError(err) {
- t.report(err)
- }
- continue
- }
-
- if e == nil {
- return
- }
-
- h := <-t.execPool
- go func(h jobs.Handler, e *entry) {
- err := t.do(cn, h, e)
- t.execPool <- h
- t.wg.Done()
- t.report(err)
- }(h, e)
- }
-}
-
-// fetch consume
-func (t *tube) consume(cn *conn) (*entry, error) {
- t.muw.Lock()
- defer t.muw.Unlock()
-
- select {
- case <-t.wait:
- return nil, nil
- default:
- conn, err := cn.acquire(false)
- if err != nil {
- return nil, err
- }
-
- t.tubeSet.Conn = conn
-
- id, data, err := t.tubeSet.Reserve(t.reserve)
- cn.release(err)
-
- if err != nil {
- return nil, err
- }
-
- t.wg.Add(1)
- return &entry{id: id, data: data}, nil
- }
-}
-
-// do data
-func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error {
- j, err := unpack(e.data)
- if err != nil {
- return err
- }
-
- err = h(e.String(), j)
-
- // mandatory acquisition
- conn, connErr := cn.acquire(true)
- if connErr != nil {
- // possible if server is dead
- return connErr
- }
-
- if err == nil {
- return cn.release(conn.Delete(e.id))
- }
-
- stat, statErr := conn.StatsJob(e.id)
- if statErr != nil {
- return cn.release(statErr)
- }
-
- t.errHandler(e.String(), j, err)
-
- reserves, ok := strconv.Atoi(stat["reserves"])
- if ok != nil || !j.Options.CanRetry(reserves-1) {
- return cn.release(conn.Bury(e.id, 0))
- }
-
- return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration()))
-}
-
-// stop tube consuming
-func (t *tube) stop() {
- if atomic.LoadInt32(&t.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&t.active, 0)
-
- close(t.wait)
-
- t.muw.Lock()
- t.wg.Wait()
- t.muw.Unlock()
-}
-
-// put data into pool or return error (no wait), this method will try to reattempt operation if
-// dead conn found.
-func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
- id, err = t.doPut(cn, attempt, data, delay, rrt)
- if err != nil && isConnError(err) {
- return t.doPut(cn, attempt, data, delay, rrt)
- }
-
- return id, err
-}
-
-// perform put operation
-func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
- conn, err := cn.acquire(false)
- if err != nil {
- return "", err
- }
-
- var bid uint64
-
- t.mut.Lock()
- t.tube.Conn = conn
- bid, err = t.tube.Put(data, 0, delay, rrt)
- t.mut.Unlock()
-
- return strconv.FormatUint(bid, 10), cn.release(err)
-}
-
-// return tube stats (retries)
-func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) {
- stat, err = t.doStat(cn)
- if err != nil && isConnError(err) {
- return t.doStat(cn)
- }
-
- return stat, err
-}
-
-// return tube stats
-func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) {
- conn, err := cn.acquire(false)
- if err != nil {
- return nil, err
- }
-
- t.mut.Lock()
- t.tube.Conn = conn
- values, err := t.tube.Stats()
- t.mut.Unlock()
-
- if err != nil {
- return nil, cn.release(err)
- }
-
- stat = &jobs.Stat{InternalName: t.tube.Name}
-
- if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil {
- stat.Queue = int64(v)
- }
-
- if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil {
- stat.Active = int64(v)
- }
-
- if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil {
- stat.Delayed = int64(v)
- }
-
- return stat, cn.release(nil)
-}
-
-// report tube specific error
-func (t *tube) report(err error) {
- if err != nil {
- t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go
deleted file mode 100644
index b6a646f4..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/tube_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package beanstalk
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestTube_CantServe(t *testing.T) {
- var gctx interface{}
- tube := &tube{
- lsn: func(event int, ctx interface{}) {
- gctx = ctx
- },
- }
-
- tube.serve(&Config{Addr: "broken"})
- assert.Error(t, gctx.(error))
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/broker.go b/plugins/jobs/oooold/broker/ephemeral/broker.go
deleted file mode 100644
index 385bb175..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/broker.go
+++ /dev/null
@@ -1,174 +0,0 @@
-package ephemeral
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker run queue using local goroutines.
-type Broker struct {
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures broker.
-func (b *Broker) Init() (bool, error) {
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0))
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() error {
- // start consuming
- b.mu.Lock()
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve()
- }
- }
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- // stop all consuming
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.wait != nil {
- if q.execPool != nil {
- go q.serve()
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q.push(id.String(), j, 0, j.Options.DelayDuration())
-
- return id.String(), nil
-}
-
-// Stat must consume statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- return q.stat(), nil
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/broker_test.go b/plugins/jobs/oooold/broker/ephemeral/broker_test.go
deleted file mode 100644
index c1b40276..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/broker_test.go
+++ /dev/null
@@ -1,221 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "local",
- "name": "default",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init()
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- b.Init()
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/consume_test.go b/plugins/jobs/oooold/broker/ephemeral/consume_test.go
deleted file mode 100644
index d764a984..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/consume_test.go
+++ /dev/null
@@ -1,253 +0,0 @@
-package ephemeral
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed > time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go
deleted file mode 100644
index a24bc216..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/queue.go
+++ /dev/null
@@ -1,161 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type queue struct {
- on int32
- state *jobs.Stat
-
- // job pipeline
- concurPool chan interface{}
- jobs chan *entry
-
- // on operations
- muw sync.Mutex
- wg sync.WaitGroup
-
- // stop channel
- wait chan interface{}
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-type entry struct {
- id string
- job *jobs.Job
- attempt int
-}
-
-// create new queue
-func newQueue(maxConcur int) *queue {
- q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)}
-
- if maxConcur != 0 {
- q.concurPool = make(chan interface{}, maxConcur)
- for i := 0; i < maxConcur; i++ {
- q.concurPool <- nil
- }
- }
-
- return q
-}
-
-// serve consumers
-func (q *queue) serve() {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.on, 1)
-
- for {
- e := q.consume()
- if e == nil {
- q.wg.Wait()
- return
- }
-
- if q.concurPool != nil {
- <-q.concurPool
- }
-
- atomic.AddInt64(&q.state.Active, 1)
- h := <-q.execPool
-
- go func(h jobs.Handler, e *entry) {
- defer q.wg.Done()
-
- q.do(h, e)
- atomic.AddInt64(&q.state.Active, ^int64(0))
-
- q.execPool <- h
-
- if q.concurPool != nil {
- q.concurPool <- nil
- }
- }(h, e)
- }
-}
-
-// allocate one job entry
-func (q *queue) consume() *entry {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil
- case e := <-q.jobs:
- q.wg.Add(1)
-
- return e
- }
-}
-
-// do singe job
-func (q *queue) do(h jobs.Handler, e *entry) {
- err := h(e.id, e.job)
-
- if err == nil {
- atomic.AddInt64(&q.state.Queue, ^int64(0))
- return
- }
-
- q.errHandler(e.id, e.job, err)
-
- if !e.job.Options.CanRetry(e.attempt) {
- atomic.AddInt64(&q.state.Queue, ^int64(0))
- return
- }
-
- q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
-}
-
-// stop the queue consuming
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.on) == 0 {
- return
- }
-
- close(q.wait)
-
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-
- atomic.StoreInt32(&q.on, 0)
-}
-
-// add job to the queue
-func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
- if delay == 0 {
- atomic.AddInt64(&q.state.Queue, 1)
- go func() {
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-
- return
- }
-
- atomic.AddInt64(&q.state.Delayed, 1)
- go func() {
- time.Sleep(delay)
- atomic.AddInt64(&q.state.Delayed, ^int64(0))
- atomic.AddInt64(&q.state.Queue, 1)
-
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-}
-
-func (q *queue) stat() *jobs.Stat {
- return &jobs.Stat{
- InternalName: ":memory:",
- Queue: atomic.LoadInt64(&q.state.Queue),
- Active: atomic.LoadInt64(&q.state.Active),
- Delayed: atomic.LoadInt64(&q.state.Delayed),
- }
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/stat_test.go b/plugins/jobs/oooold/broker/ephemeral/stat_test.go
deleted file mode 100644
index 0894323c..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/stat_test.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Active)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/broker.go b/plugins/jobs/oooold/broker/sqs/broker.go
deleted file mode 100644
index 8cc62b6b..00000000
--- a/plugins/jobs/oooold/broker/sqs/broker.go
+++ /dev/null
@@ -1,189 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker represents SQS broker.
-type Broker struct {
- cfg *Config
- sqs *sqs.SQS
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures SQS broker.
-func (b *Broker) Init(cfg *Config) (ok bool, err error) {
- b.cfg = cfg
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- q, err := newQueue(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.queues[pipe] = q
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- b.sqs, err = b.cfg.SQS()
- if err != nil {
- return err
- }
-
- for _, q := range b.queues {
- q.url, err = q.declareQueue(b.sqs)
- if err != nil {
- return err
- }
- }
-
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve(b.sqs, b.cfg.TimeoutDuration())
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, q := range b.queues {
- q.stop()
- }
-
- b.wait <- nil
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.sqs != nil && q.execPool != nil {
- go q.serve(b.sqs, b.cfg.TimeoutDuration())
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- if j.Options.Delay > 900 || j.Options.RetryDelay > 900 {
- return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name())
- }
-
- return q.send(b.sqs, j)
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- return q.stat(b.sqs)
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/sqs/broker_test.go b/plugins/jobs/oooold/broker/sqs/broker_test.go
deleted file mode 100644
index c87b302d..00000000
--- a/plugins/jobs/oooold/broker/sqs/broker_test.go
+++ /dev/null
@@ -1,275 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- "declare": map[string]interface{}{
- "MessageRetentionPeriod": 86400,
- },
- }
-
- cfg = &Config{
- Key: "api-key",
- Secret: "api-secret",
- Region: "us-west-1",
- Endpoint: "http://localhost:9324",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_RegisterInvalid(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- }))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- b.Consume(pipe, exec, errf)
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) {
- pipe := &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "invalid",
- "declare": map[string]interface{}{
- "VisibilityTimeout": "invalid",
- },
- }
-
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- b.Consume(pipe, exec, errf)
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/config.go b/plugins/jobs/oooold/broker/sqs/config.go
deleted file mode 100644
index d0c2f2b2..00000000
--- a/plugins/jobs/oooold/broker/sqs/config.go
+++ /dev/null
@@ -1,82 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/credentials"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/roadrunner/service"
- "time"
-)
-
-// Config defines sqs broker configuration.
-type Config struct {
- // Region defined SQS region, not required when endpoint is not empty.
- Region string
-
- // Region defined AWS API key, not required when endpoint is not empty.
- Key string
-
- // Region defined AWS API secret, not required when endpoint is not empty.
- Secret string
-
- // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development.
- Endpoint string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Region == "" {
- return fmt.Errorf("SQS region is missing")
- }
-
- if c.Key == "" {
- return fmt.Errorf("SQS key is missing")
- }
-
- if c.Secret == "" {
- return fmt.Errorf("SQS secret is missing")
- }
-
- return nil
-}
-
-// TimeoutDuration returns number of seconds allowed to allocate the connection.
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
-
-// Session returns new AWS session.
-func (c *Config) Session() (*session.Session, error) {
- return session.NewSession(&aws.Config{
- Region: aws.String(c.Region),
- Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""),
- })
-}
-
-// SQS returns new SQS instance or error.
-func (c *Config) SQS() (*sqs.SQS, error) {
- sess, err := c.Session()
- if err != nil {
- return nil, err
- }
-
- if c.Endpoint == "" {
- return sqs.New(sess), nil
- }
-
- return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil
-}
diff --git a/plugins/jobs/oooold/broker/sqs/config_test.go b/plugins/jobs/oooold/broker/sqs/config_test.go
deleted file mode 100644
index b36b3c6f..00000000
--- a/plugins/jobs/oooold/broker/sqs/config_test.go
+++ /dev/null
@@ -1,48 +0,0 @@
-package sqs
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error3(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1"}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error4(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error5(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
diff --git a/plugins/jobs/oooold/broker/sqs/consume_test.go b/plugins/jobs/oooold/broker/sqs/consume_test.go
deleted file mode 100644
index 434fc6ea..00000000
--- a/plugins/jobs/oooold/broker/sqs/consume_test.go
+++ /dev/null
@@ -1,370 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) {
- pipe := &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- }
-
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_PushTooBigDelay(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{
- Delay: 901,
- },
- })
-
- assert.Error(t, perr)
-}
-
-func TestBroker_Consume_PushTooBigDelay2(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{
- RetryDelay: 901,
- },
- })
-
- assert.Error(t, perr)
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed > time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
- b.Stop()
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/durability_test.go b/plugins/jobs/oooold/broker/sqs/durability_test.go
deleted file mode 100644
index 58ddf4b9..00000000
--- a/plugins/jobs/oooold/broker/sqs/durability_test.go
+++ /dev/null
@@ -1,588 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Key: "api-key",
- Secret: "api-secret",
- Region: "us-west-1",
- Endpoint: "http://localhost:9325",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:9325",
- upstream: "localhost:9324",
- accept: true,
- }
-
- proxyPipe = &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- "lockReserved": 1,
- "declare": map[string]interface{}{
- "MessageRetentionPeriod": 86400,
- },
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- // expect 2 connections
- proxy.waitConn(1)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(1)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- proxy.reset(false)
-
- _, serr = b.Stat(proxyPipe)
- assert.Error(t, serr)
-
- proxy.reset(true)
-
- _, serr = b.Stat(proxyPipe)
- assert.NoError(t, serr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1)
-
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(3), st.Queue+st.Active)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- if j.Payload == "kill" {
- proxy.reset(true)
- }
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 3 {
- break
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/sqs/job.go b/plugins/jobs/oooold/broker/sqs/job.go
deleted file mode 100644
index 50e2c164..00000000
--- a/plugins/jobs/oooold/broker/sqs/job.go
+++ /dev/null
@@ -1,80 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "strconv"
- "time"
-)
-
-var jobAttributes = []*string{
- aws.String("rr-job"),
- aws.String("rr-maxAttempts"),
- aws.String("rr-delay"),
- aws.String("rr-timeout"),
- aws.String("rr-retryDelay"),
-}
-
-// pack job metadata into headers
-func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
- return &sqs.SendMessageInput{
- QueueUrl: url,
- DelaySeconds: aws.Int64(int64(j.Options.Delay)),
- MessageBody: aws.String(j.Payload),
- MessageAttributes: map[string]*sqs.MessageAttributeValue{
- "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)},
- "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)},
- "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())},
- "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())},
- "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
- },
- }
-}
-
-// unpack restores jobs.Options
-func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
- if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok {
- return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount")
- }
- attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])
-
- for _, attr := range jobAttributes {
- if _, ok := msg.MessageAttributes[*attr]; !ok {
- return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
- }
- }
-
- j = &jobs.Job{
- Job: *msg.MessageAttributes["rr-job"].StringValue,
- Payload: *msg.Body,
- Options: &jobs.Options{},
- }
-
- if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil {
- j.Options.Delay = delay
- }
-
- if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil {
- j.Options.Attempts = maxAttempts
- }
-
- if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil {
- j.Options.Timeout = timeout
- }
-
- if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil {
- j.Options.RetryDelay = retryDelay
- }
-
- return *msg.MessageId, attempt - 1, j, nil
-}
-
-func awsString(n int) *string {
- return aws.String(strconv.Itoa(n))
-}
-
-func awsDuration(d time.Duration) *string {
- return aws.String(strconv.Itoa(int(d.Seconds())))
-}
diff --git a/plugins/jobs/oooold/broker/sqs/job_test.go b/plugins/jobs/oooold/broker/sqs/job_test.go
deleted file mode 100644
index a120af53..00000000
--- a/plugins/jobs/oooold/broker/sqs/job_test.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package sqs
-
-import (
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func Test_Unpack(t *testing.T) {
- msg := &sqs.Message{
- Body: aws.String("body"),
- Attributes: map[string]*string{},
- MessageAttributes: map[string]*sqs.MessageAttributeValue{},
- }
-
- _, _, _, err := unpack(msg)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/queue.go b/plugins/jobs/oooold/broker/sqs/queue.go
deleted file mode 100644
index 8a92448e..00000000
--- a/plugins/jobs/oooold/broker/sqs/queue.go
+++ /dev/null
@@ -1,266 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type queue struct {
- active int32
- pipe *jobs.Pipeline
- url *string
- reserve time.Duration
- lockReserved time.Duration
-
- // queue events
- lsn func(event int, ctx interface{})
-
- // stop channel
- wait chan interface{}
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
- if pipe.String("queue", "") == "" {
- return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name())
- }
-
- return &queue{
- pipe: pipe,
- reserve: pipe.Duration("reserve", time.Second),
- lockReserved: pipe.Duration("lockReserved", 300*time.Second),
- lsn: lsn,
- }, nil
-}
-
-// declareQueue declared queue
-func (q *queue) declareQueue(s *sqs.SQS) (*string, error) {
- attr := make(map[string]*string)
- for k, v := range q.pipe.Map("declare") {
- if vs, ok := v.(string); ok {
- attr[k] = aws.String(vs)
- }
-
- if vi, ok := v.(int); ok {
- attr[k] = aws.String(strconv.Itoa(vi))
- }
-
- if vb, ok := v.(bool); ok {
- if vb {
- attr[k] = aws.String("true")
- } else {
- attr[k] = aws.String("false")
- }
- }
- }
-
- if len(attr) != 0 {
- r, err := s.CreateQueue(&sqs.CreateQueueInput{
- QueueName: aws.String(q.pipe.String("queue", "")),
- Attributes: attr,
- })
-
- return r.QueueUrl, err
- }
-
- // no need to create (get existed)
- r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))})
- if err != nil {
- return nil, err
- }
-
- return r.QueueUrl, nil
-}
-
-// serve consumers
-func (q *queue) serve(s *sqs.SQS, tout time.Duration) {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.active, 1)
-
- var errored bool
- for {
- messages, stop, err := q.consume(s)
- if err != nil {
- if errored {
- // reoccurring error
- time.Sleep(tout)
- } else {
- errored = true
- q.report(err)
- }
-
- continue
- }
- errored = false
-
- if stop {
- return
- }
-
- for _, msg := range messages {
- h := <-q.execPool
- go func(h jobs.Handler, msg *sqs.Message) {
- err := q.do(s, h, msg)
- q.execPool <- h
- q.wg.Done()
- q.report(err)
- }(h, msg)
- }
- }
-}
-
-// consume and allocate connection.
-func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil, true, nil
- default:
- r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{
- QueueUrl: q.url,
- MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))),
- WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())),
- VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())),
- AttributeNames: []*string{aws.String("ApproximateReceiveCount")},
- MessageAttributeNames: jobAttributes,
- })
- if err != nil {
- return nil, false, err
- }
-
- q.wg.Add(len(r.Messages))
-
- return r.Messages, false, nil
- }
-}
-
-// do single message
-func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) {
- id, attempt, j, err := unpack(msg)
- if err != nil {
- go q.deleteMessage(s, msg, err)
- return err
- }
-
- // block the job based on known timeout
- _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- QueueUrl: q.url,
- ReceiptHandle: msg.ReceiptHandle,
- VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())),
- })
- if err != nil {
- go q.deleteMessage(s, msg, err)
- return err
- }
-
- err = h(id, j)
- if err == nil {
- return q.deleteMessage(s, msg, nil)
- }
-
- q.errHandler(id, j, err)
-
- if !j.Options.CanRetry(attempt) {
- return q.deleteMessage(s, msg, err)
- }
-
- // retry after specified duration
- _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- QueueUrl: q.url,
- ReceiptHandle: msg.ReceiptHandle,
- VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)),
- })
-
- return err
-}
-
-func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error {
- _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle})
- return drr
-}
-
-// stop the queue consuming
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&q.active, 0)
-
- close(q.wait)
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-}
-
-// add job to the queue
-func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) {
- r, err := s.SendMessage(pack(q.url, j))
- if err != nil {
- return "", err
- }
-
- return *r.MessageId, nil
-}
-
-// return queue stats
-func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) {
- r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{
- QueueUrl: q.url,
- AttributeNames: []*string{
- aws.String("ApproximateNumberOfMessages"),
- aws.String("ApproximateNumberOfMessagesDelayed"),
- aws.String("ApproximateNumberOfMessagesNotVisible"),
- },
- })
-
- if err != nil {
- return nil, err
- }
-
- stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")}
-
- for a, v := range r.Attributes {
- if a == "ApproximateNumberOfMessages" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Queue = int64(v)
- }
- }
-
- if a == "ApproximateNumberOfMessagesNotVisible" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Active = int64(v)
- }
- }
-
- if a == "ApproximateNumberOfMessagesDelayed" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Delayed = int64(v)
- }
- }
- }
-
- return stat, nil
-}
-
-// throw handles service, server and pool events.
-func (q *queue) report(err error) {
- if err != nil {
- q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/sqs/stat_test.go b/plugins/jobs/oooold/broker/sqs/stat_test.go
deleted file mode 100644
index 5031571b..00000000
--- a/plugins/jobs/oooold/broker/sqs/stat_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- // unable to use approximated stats
- _, err = b.Stat(pipe)
- assert.NoError(t, err)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- _, err := b.Stat(pipe)
- assert.NoError(t, err)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
- _, err = b.Stat(pipe)
- assert.NoError(t, err)
-}