summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/amqp
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp')
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker.go216
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker_test.go419
-rw-r--r--plugins/jobs/oooold/broker/amqp/conn.go232
-rw-r--r--plugins/jobs/oooold/broker/amqp/consume_test.go258
-rw-r--r--plugins/jobs/oooold/broker/amqp/durability_test.go728
-rw-r--r--plugins/jobs/oooold/broker/amqp/job.go56
-rw-r--r--plugins/jobs/oooold/broker/amqp/job_test.go29
-rw-r--r--plugins/jobs/oooold/broker/amqp/queue.go302
-rw-r--r--plugins/jobs/oooold/broker/amqp/stat_test.go63
9 files changed, 0 insertions, 2303 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go
deleted file mode 100644
index b47d83ee..00000000
--- a/plugins/jobs/oooold/broker/amqp/broker.go
+++ /dev/null
@@ -1,216 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "github.com/spiral/jobs/v2"
- "sync"
- "sync/atomic"
-)
-
-// Broker represents AMQP broker.
-type Broker struct {
- cfg *Config
- lsn func(event int, ctx interface{})
- publish *chanPool
- consume *chanPool
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures AMQP job broker (always 2 connections).
-func (b *Broker) Init(cfg *Config) (ok bool, err error) {
- b.cfg = cfg
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- q, err := newQueue(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.queues[pipe] = q
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
- b.mu.Unlock()
- return err
- }
- defer b.publish.Close()
-
- if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
- b.mu.Unlock()
- return err
- }
- defer b.consume.Close()
-
- for _, q := range b.queues {
- err := q.declare(b.publish, q.name, q.key, nil)
- if err != nil {
- b.mu.Unlock()
- return err
- }
- }
-
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve(b.publish, b.consume)
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.publish != nil && q.execPool != nil {
- if q.execPool != nil {
- go q.serve(b.publish, b.consume)
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil {
- return "", err
- }
-
- return id.String(), nil
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- queue, err := q.inspect(b.publish)
- if err != nil {
- return nil, err
- }
-
- // this the closest approximation we can get for now
- return &jobs.Stat{
- InternalName: queue.Name,
- Queue: int64(queue.Messages),
- Active: int64(atomic.LoadInt32(&q.running)),
- }, nil
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go
deleted file mode 100644
index 66078099..00000000
--- a/plugins/jobs/oooold/broker/amqp/broker_test.go
+++ /dev/null
@@ -1,419 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "amqp",
- "name": "default",
- "queue": "rr-queue",
- "exchange": "rr-exchange",
- "prefetch": 1,
- }
-
- cfg = &Config{
- Addr: "amqp://guest:guest@localhost:5672/",
- }
-)
-
-var (
- fanoutPipe = &jobs.Pipeline{
- "broker": "amqp",
- "name": "fanout",
- "queue": "fanout-queue",
- "exchange": "fanout-exchange",
- "exchange-type": "fanout",
- "prefetch": 1,
- }
-
- fanoutCfg = &Config{
- Addr: "amqp://guest:guest@localhost:5672/",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_BadPipeline(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "amqp",
- "name": "default",
- "exchange": "rr-exchange",
- "prefetch": 1,
- }))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_CantStart(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(&Config{
- Addr: "amqp://guest:guest@localhost:15672/",
- })
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal()
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_Queue_RoutingKey(t *testing.T) {
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key")
-}
-
-func TestBroker_Register_With_RoutingKey(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- assert.NoError(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Consume_With_RoutingKey(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- err = b.Register(&pipeWithKey)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(&pipeWithKey, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Queue_ExchangeType(t *testing.T) {
- pipeWithKey := pipe.With("exchange-type", "direct")
-
- assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct")
-}
-
-func TestBroker_Register_With_ExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("exchange-type", "fanout")
-
- assert.NoError(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Register_With_WrongExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("exchange-type", "xxx")
-
- assert.Error(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Consume_With_ExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(fanoutCfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := fanoutPipe.With("exchange-type", "fanout")
-
- err = b.Register(&pipeWithKey)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(&pipeWithKey, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go
deleted file mode 100644
index be747776..00000000
--- a/plugins/jobs/oooold/broker/amqp/conn.go
+++ /dev/null
@@ -1,232 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/cenkalti/backoff/v4"
- "github.com/streadway/amqp"
- "sync"
- "time"
-)
-
-// manages set of AMQP channels
-type chanPool struct {
- // timeout to backoff redial
- tout time.Duration
- url string
-
- mu *sync.Mutex
-
- conn *amqp.Connection
- channels map[string]*channel
- wait chan interface{}
- connected chan interface{}
-}
-
-// manages single channel
-type channel struct {
- ch *amqp.Channel
- // todo unused
- //consumer string
- confirm chan amqp.Confirmation
- signal chan error
-}
-
-// newConn creates new watched AMQP connection
-func newConn(url string, tout time.Duration) (*chanPool, error) {
- conn, err := dial(url)
- if err != nil {
- return nil, err
- }
-
- cp := &chanPool{
- url: url,
- tout: tout,
- conn: conn,
- mu: &sync.Mutex{},
- channels: make(map[string]*channel),
- wait: make(chan interface{}),
- connected: make(chan interface{}),
- }
-
- close(cp.connected)
- go cp.watch()
- return cp, nil
-}
-
-// dial dials to AMQP.
-func dial(url string) (*amqp.Connection, error) {
- return amqp.Dial(url)
-}
-
-// Close gracefully closes all underlying channels and connection.
-func (cp *chanPool) Close() error {
- cp.mu.Lock()
-
- close(cp.wait)
- if cp.channels == nil {
- return fmt.Errorf("connection is dead")
- }
-
- // close all channels and consume
- var wg sync.WaitGroup
- for _, ch := range cp.channels {
- wg.Add(1)
-
- go func(ch *channel) {
- defer wg.Done()
- cp.closeChan(ch, nil)
- }(ch)
- }
- cp.mu.Unlock()
-
- wg.Wait()
-
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- if cp.conn != nil {
- return cp.conn.Close()
- }
-
- return nil
-}
-
-// waitConnected waits till connection is connected again or eventually closed.
-// must only be invoked after connection error has been delivered to channel.signal.
-func (cp *chanPool) waitConnected() chan interface{} {
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- return cp.connected
-}
-
-// watch manages connection state and reconnects if needed
-func (cp *chanPool) watch() {
- for {
- select {
- case <-cp.wait:
- // connection has been closed
- return
- // here we are waiting for the errors from amqp connection
- case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)):
- cp.mu.Lock()
- // clear connected, since connections are dead
- cp.connected = make(chan interface{})
-
- // broadcast error to all consume to let them for the tryReconnect
- for _, ch := range cp.channels {
- ch.signal <- err
- }
-
- // disable channel allocation while server is dead
- cp.conn = nil
- cp.channels = nil
-
- // initialize the backoff
- expb := backoff.NewExponentialBackOff()
- expb.MaxInterval = cp.tout
- cp.mu.Unlock()
-
- // reconnect function
- reconnect := func() error {
- cp.mu.Lock()
- conn, err := dial(cp.url)
- if err != nil {
- // still failing
- fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error()))
- cp.mu.Unlock()
- return err
- }
-
- // TODO ADD LOGGING
- fmt.Println("------amqp successfully redialed------")
-
- // here we are reconnected
- // replace the connection
- cp.conn = conn
- // re-init the channels
- cp.channels = make(map[string]*channel)
- cp.mu.Unlock()
- return nil
- }
-
- // start backoff retry
- errb := backoff.Retry(reconnect, expb)
- if errb != nil {
- fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error()))
- // reconnection failed
- close(cp.connected)
- return
- }
- close(cp.connected)
- }
- }
-}
-
-// channel allocates new channel on amqp connection
-func (cp *chanPool) channel(name string) (*channel, error) {
- cp.mu.Lock()
- dead := cp.conn == nil
- cp.mu.Unlock()
-
- if dead {
- // wait for connection restoration (doubled the timeout duration)
- select {
- case <-time.NewTimer(cp.tout * 2).C:
- return nil, fmt.Errorf("connection is dead")
- case <-cp.connected:
- // connected
- }
- }
-
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- if cp.conn == nil {
- return nil, fmt.Errorf("connection has been closed")
- }
-
- if ch, ok := cp.channels[name]; ok {
- return ch, nil
- }
-
- // we must create new channel
- ch, err := cp.conn.Channel()
- if err != nil {
- return nil, err
- }
-
- // Enable publish confirmations
- if err = ch.Confirm(false); err != nil {
- return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err)
- }
-
- // we expect that every allocated channel would have listener on signal
- // this is not true only in case of pure producing channels
- cp.channels[name] = &channel{
- ch: ch,
- confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)),
- signal: make(chan error, 1),
- }
-
- return cp.channels[name], nil
-}
-
-// closeChan gracefully closes and removes channel allocation.
-func (cp *chanPool) closeChan(c *channel, err error) error {
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- go func() {
- c.signal <- nil
- c.ch.Close()
- }()
-
- for name, ch := range cp.channels {
- if ch == c {
- delete(cp.channels, name)
- }
- }
-
- return err
-}
diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go
deleted file mode 100644
index 28999c36..00000000
--- a/plugins/jobs/oooold/broker/amqp/consume_test.go
+++ /dev/null
@@ -1,258 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- start := time.Now()
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed >= time.Second)
- assert.True(t, elapsed < 3*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go
deleted file mode 100644
index 00d62c51..00000000
--- a/plugins/jobs/oooold/broker/amqp/durability_test.go
+++ /dev/null
@@ -1,728 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Addr: "amqp://guest:guest@localhost:5673/",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:5673",
- upstream: "localhost:5672",
- accept: true,
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- // expect 2 connections
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NotEqual(t, "0", jid)
-
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- proxy.reset(true)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2_2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // start when connection is dead
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- proxy.reset(false)
-
- _, serr := b.Stat(pipe)
- assert.Error(t, serr)
-
- proxy.reset(true)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2)
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
-
- if j.Payload == "kill" && len(done) == 0 {
- proxy.reset(true)
- }
-
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 3 {
- break
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go
deleted file mode 100644
index bd559715..00000000
--- a/plugins/jobs/oooold/broker/amqp/job.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/streadway/amqp"
-)
-
-// pack job metadata into headers
-func pack(id string, attempt int, j *jobs.Job) amqp.Table {
- return amqp.Table{
- "rr-id": id,
- "rr-job": j.Job,
- "rr-attempt": int64(attempt),
- "rr-maxAttempts": int64(j.Options.Attempts),
- "rr-timeout": int64(j.Options.Timeout),
- "rr-delay": int64(j.Options.Delay),
- "rr-retryDelay": int64(j.Options.RetryDelay),
- }
-}
-
-// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) {
- j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}}
-
- if _, ok := d.Headers["rr-id"].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id")
- }
-
- if _, ok := d.Headers["rr-attempt"].(int64); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt")
- }
-
- if _, ok := d.Headers["rr-job"].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job")
- }
- j.Job = d.Headers["rr-job"].(string)
-
- if _, ok := d.Headers["rr-maxAttempts"].(int64); ok {
- j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64))
- }
-
- if _, ok := d.Headers["rr-timeout"].(int64); ok {
- j.Options.Timeout = int(d.Headers["rr-timeout"].(int64))
- }
-
- if _, ok := d.Headers["rr-delay"].(int64); ok {
- j.Options.Delay = int(d.Headers["rr-delay"].(int64))
- }
-
- if _, ok := d.Headers["rr-retryDelay"].(int64); ok {
- j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64))
- }
-
- return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil
-}
diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go
deleted file mode 100644
index 24ca453b..00000000
--- a/plugins/jobs/oooold/broker/amqp/job_test.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package amqp
-
-import (
- "github.com/streadway/amqp"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func Test_Unpack_Errors(t *testing.T) {
- _, _, _, err := unpack(amqp.Delivery{
- Headers: map[string]interface{}{},
- })
- assert.Error(t, err)
-
- _, _, _, err = unpack(amqp.Delivery{
- Headers: map[string]interface{}{
- "rr-id": "id",
- },
- })
- assert.Error(t, err)
-
- _, _, _, err = unpack(amqp.Delivery{
- Headers: map[string]interface{}{
- "rr-id": "id",
- "rr-attempt": int64(0),
- },
- })
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go
deleted file mode 100644
index 6ef5f20f..00000000
--- a/plugins/jobs/oooold/broker/amqp/queue.go
+++ /dev/null
@@ -1,302 +0,0 @@
-package amqp
-
-import (
- "errors"
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/streadway/amqp"
- "os"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type ExchangeType string
-
-const (
- Direct ExchangeType = "direct"
- Fanout ExchangeType = "fanout"
- Topic ExchangeType = "topic"
- Headers ExchangeType = "headers"
-)
-
-func (et ExchangeType) IsValid() error {
- switch et {
- case Direct, Fanout, Topic, Headers:
- return nil
- }
- return errors.New("unknown exchange-type")
-}
-
-func (et ExchangeType) String() string {
- switch et {
- case Direct, Fanout, Topic, Headers:
- return string(et)
- default:
- return "direct"
- }
-}
-
-
-type queue struct {
- active int32
- pipe *jobs.Pipeline
- exchange string
- exchangeType ExchangeType
- name, key string
- consumer string
-
- // active consuming channel
- muc sync.Mutex
- cc *channel
-
- // queue events
- lsn func(event int, ctx interface{})
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- running int32
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-// newQueue creates new queue wrapper for AMQP.
-func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
- if pipe.String("queue", "") == "" {
- return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline")
- }
-
- exchangeType := ExchangeType(pipe.String("exchange-type", "direct"))
-
- err := exchangeType.IsValid()
- if err != nil {
- return nil, fmt.Errorf(err.Error())
- }
-
- return &queue{
- exchange: pipe.String("exchange", "amqp.direct"),
- exchangeType: exchangeType,
- name: pipe.String("queue", ""),
- key: pipe.String("routing-key", pipe.String("queue", "")),
- consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())),
- pipe: pipe,
- lsn: lsn,
- }, nil
-}
-
-// serve consumes queue
-func (q *queue) serve(publish, consume *chanPool) {
- atomic.StoreInt32(&q.active, 1)
-
- for {
- <-consume.waitConnected()
- if atomic.LoadInt32(&q.active) == 0 {
- // stopped
- return
- }
-
- delivery, cc, err := q.consume(consume)
- if err != nil {
- q.report(err)
- continue
- }
-
- q.muc.Lock()
- q.cc = cc
- q.muc.Unlock()
-
- for d := range delivery {
- q.muw.Lock()
- q.wg.Add(1)
- q.muw.Unlock()
-
- atomic.AddInt32(&q.running, 1)
- h := <-q.execPool
-
- go func(h jobs.Handler, d amqp.Delivery) {
- err := q.do(publish, h, d)
-
- atomic.AddInt32(&q.running, ^int32(0))
- q.execPool <- h
- q.wg.Done()
- q.report(err)
- }(h, d)
- }
- }
-}
-
-func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) {
- // allocate channel for the consuming
- if cc, err = consume.channel(q.name); err != nil {
- return nil, nil, err
- }
-
- if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil {
- return nil, nil, consume.closeChan(cc, err)
- }
-
- delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil)
- if err != nil {
- return nil, nil, consume.closeChan(cc, err)
- }
-
- // do i like it?
- go func(consume *chanPool) {
- for err := range cc.signal {
- consume.closeChan(cc, err)
- return
- }
- }(consume)
-
- return delivery, cc, err
-}
-
-func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error {
- id, attempt, j, err := unpack(d)
- if err != nil {
- q.report(err)
- return d.Nack(false, false)
- }
- err = h(id, j)
-
- if err == nil {
- return d.Ack(false)
- }
-
- // failed
- q.errHandler(id, j, err)
-
- if !j.Options.CanRetry(attempt) {
- return d.Nack(false, false)
- }
-
- // retry as new j (to accommodate attempt number and new delay)
- if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil {
- q.report(err)
- return d.Nack(false, true)
- }
-
- return d.Ack(false)
-}
-
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&q.active, 0)
-
- q.muc.Lock()
- if q.cc != nil {
- // gracefully stopped consuming
- q.report(q.cc.ch.Cancel(q.consumer, true))
- }
- q.muc.Unlock()
-
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-}
-
-// publish message to queue or to delayed queue.
-func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error {
- c, err := cp.channel(q.name)
- if err != nil {
- return err
- }
-
- qKey := q.key
-
- if delay != 0 {
- delayMs := int64(delay.Seconds() * 1000)
- qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name)
- qKey = qName
-
- err := q.declare(cp, qName, qName, amqp.Table{
- "x-dead-letter-exchange": q.exchange,
- "x-dead-letter-routing-key": q.name,
- "x-message-ttl": delayMs,
- "x-expires": delayMs * 2,
- })
-
- if err != nil {
- return err
- }
- }
-
- err = c.ch.Publish(
- q.exchange, // exchange
- qKey, // routing key
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- ContentType: "application/octet-stream",
- Body: j.Body(),
- DeliveryMode: amqp.Persistent,
- Headers: pack(id, attempt, j),
- },
- )
-
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- confirmed, ok := <-c.confirm
- if ok && confirmed.Ack {
- return nil
- }
-
- return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag)
-}
-
-// declare queue and binding to it
-func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error {
- c, err := cp.channel(q.name)
- if err != nil {
- return err
- }
-
- err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- _, err = c.ch.QueueDeclare(queue, true, false, false, false, args)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- err = c.ch.QueueBind(queue, key, q.exchange, false, nil)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- // keep channel open
- return err
-}
-
-// inspect the queue
-func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) {
- c, err := cp.channel("stat")
- if err != nil {
- return nil, err
- }
-
- queue, err := c.ch.QueueInspect(q.name)
- if err != nil {
- return nil, cp.closeChan(c, err)
- }
-
- // keep channel open
- return &queue, err
-}
-
-// throw handles service, server and pool events.
-func (q *queue) report(err error) {
- if err != nil {
- q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go
deleted file mode 100644
index ef19746c..00000000
--- a/plugins/jobs/oooold/broker/amqp/stat_test.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "sync"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
- exec <- func(id string, j *jobs.Job) error {
- defer wg.Done()
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Active)
-
- return nil
- }
-
- wg.Wait()
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-}