summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/amqp
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-16 12:56:02 +0300
committerValery Piashchynski <[email protected]>2021-06-16 12:56:02 +0300
commitcee4bc46097506d6e892b6af194751434700621a (patch)
treee542d1b2f963c2aa0e304703c82ff4f04203b169 /plugins/jobs/oooold/broker/amqp
parentd4c92e48bada7593b6fbec612a742c599de6e736 (diff)
- Update jobs sources
- Update Arch diagramm Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp')
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker.go216
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker_test.go419
-rw-r--r--plugins/jobs/oooold/broker/amqp/config.go39
-rw-r--r--plugins/jobs/oooold/broker/amqp/config_test.go27
-rw-r--r--plugins/jobs/oooold/broker/amqp/conn.go232
-rw-r--r--plugins/jobs/oooold/broker/amqp/consume_test.go258
-rw-r--r--plugins/jobs/oooold/broker/amqp/durability_test.go728
-rw-r--r--plugins/jobs/oooold/broker/amqp/job.go56
-rw-r--r--plugins/jobs/oooold/broker/amqp/job_test.go29
-rw-r--r--plugins/jobs/oooold/broker/amqp/queue.go302
-rw-r--r--plugins/jobs/oooold/broker/amqp/stat_test.go63
11 files changed, 2369 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go
new file mode 100644
index 00000000..b47d83ee
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/broker.go
@@ -0,0 +1,216 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/gofrs/uuid"
+ "github.com/spiral/jobs/v2"
+ "sync"
+ "sync/atomic"
+)
+
+// Broker represents AMQP broker.
+type Broker struct {
+ cfg *Config
+ lsn func(event int, ctx interface{})
+ publish *chanPool
+ consume *chanPool
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ queues map[*jobs.Pipeline]*queue
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures AMQP job broker (always 2 connections).
+func (b *Broker) Init(cfg *Config) (ok bool, err error) {
+ b.cfg = cfg
+ b.queues = make(map[*jobs.Pipeline]*queue)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.queues[pipe]; ok {
+ return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
+ }
+
+ q, err := newQueue(pipe, b.throw)
+ if err != nil {
+ return err
+ }
+
+ b.queues[pipe] = q
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() (err error) {
+ b.mu.Lock()
+
+ if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ defer b.publish.Close()
+
+ if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ defer b.consume.Close()
+
+ for _, q := range b.queues {
+ err := q.declare(b.publish, q.name, q.key, nil)
+ if err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ }
+
+ for _, q := range b.queues {
+ qq := q
+ if qq.execPool != nil {
+ go qq.serve(b.publish, b.consume)
+ }
+ }
+
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ for _, q := range b.queues {
+ q.stop()
+ }
+
+ close(b.wait)
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ q.stop()
+
+ q.execPool = execPool
+ q.errHandler = errHandler
+
+ if b.publish != nil && q.execPool != nil {
+ if q.execPool != nil {
+ go q.serve(b.publish, b.consume)
+ }
+ }
+
+ return nil
+}
+
+// Push job into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ id, err := uuid.NewV4()
+ if err != nil {
+ return "", err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil {
+ return "", err
+ }
+
+ return id.String(), nil
+}
+
+// Stat must fetch statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ queue, err := q.inspect(b.publish)
+ if err != nil {
+ return nil, err
+ }
+
+ // this the closest approximation we can get for now
+ return &jobs.Stat{
+ InternalName: queue.Name,
+ Queue: int64(queue.Messages),
+ Active: int64(atomic.LoadInt32(&q.running)),
+ }, nil
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return nil
+ }
+
+ return q
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go
new file mode 100644
index 00000000..66078099
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/broker_test.go
@@ -0,0 +1,419 @@
+package amqp
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+var (
+ pipe = &jobs.Pipeline{
+ "broker": "amqp",
+ "name": "default",
+ "queue": "rr-queue",
+ "exchange": "rr-exchange",
+ "prefetch": 1,
+ }
+
+ cfg = &Config{
+ Addr: "amqp://guest:guest@localhost:5672/",
+ }
+)
+
+var (
+ fanoutPipe = &jobs.Pipeline{
+ "broker": "amqp",
+ "name": "fanout",
+ "queue": "fanout-queue",
+ "exchange": "fanout-exchange",
+ "exchange-type": "fanout",
+ "prefetch": 1,
+ }
+
+ fanoutCfg = &Config{
+ Addr: "amqp://guest:guest@localhost:5672/",
+ }
+)
+
+func TestBroker_Init(t *testing.T) {
+ b := &Broker{}
+ ok, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.True(t, ok)
+ assert.NoError(t, err)
+}
+
+func TestBroker_StopNotStarted(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ b.Stop()
+}
+
+func TestBroker_Register(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+}
+
+func TestBroker_Register_Twice(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+ assert.Error(t, b.Register(pipe))
+}
+
+func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_Undefined(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ assert.NoError(t, b.Consume(pipe, exec, errf))
+}
+
+func TestBroker_Consume_BadPipeline(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Error(t, b.Register(&jobs.Pipeline{
+ "broker": "amqp",
+ "name": "default",
+ "exchange": "rr-exchange",
+ "prefetch": 1,
+ }))
+}
+
+func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Consume(pipe, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_CantStart(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(&Config{
+ Addr: "amqp://guest:guest@localhost:15672/",
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Serve())
+}
+
+func TestBroker_Consume_Serve_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ err = b.Consume(pipe, exec, errf)
+ if err != nil {
+ t.Fatal()
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_PushToNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_PushToNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_Queue_RoutingKey(t *testing.T) {
+ pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
+
+ assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key")
+}
+
+func TestBroker_Register_With_RoutingKey(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
+
+ assert.NoError(t, b.Register(&pipeWithKey))
+}
+
+func TestBroker_Consume_With_RoutingKey(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
+
+ err = b.Register(&pipeWithKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(&pipeWithKey, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Queue_ExchangeType(t *testing.T) {
+ pipeWithKey := pipe.With("exchange-type", "direct")
+
+ assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct")
+}
+
+func TestBroker_Register_With_ExchangeType(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := pipe.With("exchange-type", "fanout")
+
+ assert.NoError(t, b.Register(&pipeWithKey))
+}
+
+func TestBroker_Register_With_WrongExchangeType(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := pipe.With("exchange-type", "xxx")
+
+ assert.Error(t, b.Register(&pipeWithKey))
+}
+
+func TestBroker_Consume_With_ExchangeType(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(fanoutCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := fanoutPipe.With("exchange-type", "fanout")
+
+ err = b.Register(&pipeWithKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(&pipeWithKey, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
diff --git a/plugins/jobs/oooold/broker/amqp/config.go b/plugins/jobs/oooold/broker/amqp/config.go
new file mode 100644
index 00000000..0ed3a50e
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/config.go
@@ -0,0 +1,39 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Config defines sqs broker configuration.
+type Config struct {
+ // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/).
+ Addr string
+
+ // Timeout to allocate the connection. Default 10 seconds.
+ Timeout int
+}
+
+// Hydrate config values.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ if c.Addr == "" {
+ return fmt.Errorf("AMQP address is missing")
+ }
+
+ return nil
+}
+
+// TimeoutDuration returns number of seconds allowed to redial
+func (c *Config) TimeoutDuration() time.Duration {
+ timeout := c.Timeout
+ if timeout == 0 {
+ timeout = 10
+ }
+
+ return time.Duration(timeout) * time.Second
+}
diff --git a/plugins/jobs/oooold/broker/amqp/config_test.go b/plugins/jobs/oooold/broker/amqp/config_test.go
new file mode 100644
index 00000000..1abbb55d
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/config_test.go
@@ -0,0 +1,27 @@
+package amqp
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{`{"dead`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"addr":""}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go
new file mode 100644
index 00000000..be747776
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/conn.go
@@ -0,0 +1,232 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/cenkalti/backoff/v4"
+ "github.com/streadway/amqp"
+ "sync"
+ "time"
+)
+
+// manages set of AMQP channels
+type chanPool struct {
+ // timeout to backoff redial
+ tout time.Duration
+ url string
+
+ mu *sync.Mutex
+
+ conn *amqp.Connection
+ channels map[string]*channel
+ wait chan interface{}
+ connected chan interface{}
+}
+
+// manages single channel
+type channel struct {
+ ch *amqp.Channel
+ // todo unused
+ //consumer string
+ confirm chan amqp.Confirmation
+ signal chan error
+}
+
+// newConn creates new watched AMQP connection
+func newConn(url string, tout time.Duration) (*chanPool, error) {
+ conn, err := dial(url)
+ if err != nil {
+ return nil, err
+ }
+
+ cp := &chanPool{
+ url: url,
+ tout: tout,
+ conn: conn,
+ mu: &sync.Mutex{},
+ channels: make(map[string]*channel),
+ wait: make(chan interface{}),
+ connected: make(chan interface{}),
+ }
+
+ close(cp.connected)
+ go cp.watch()
+ return cp, nil
+}
+
+// dial dials to AMQP.
+func dial(url string) (*amqp.Connection, error) {
+ return amqp.Dial(url)
+}
+
+// Close gracefully closes all underlying channels and connection.
+func (cp *chanPool) Close() error {
+ cp.mu.Lock()
+
+ close(cp.wait)
+ if cp.channels == nil {
+ return fmt.Errorf("connection is dead")
+ }
+
+ // close all channels and consume
+ var wg sync.WaitGroup
+ for _, ch := range cp.channels {
+ wg.Add(1)
+
+ go func(ch *channel) {
+ defer wg.Done()
+ cp.closeChan(ch, nil)
+ }(ch)
+ }
+ cp.mu.Unlock()
+
+ wg.Wait()
+
+ cp.mu.Lock()
+ defer cp.mu.Unlock()
+
+ if cp.conn != nil {
+ return cp.conn.Close()
+ }
+
+ return nil
+}
+
+// waitConnected waits till connection is connected again or eventually closed.
+// must only be invoked after connection error has been delivered to channel.signal.
+func (cp *chanPool) waitConnected() chan interface{} {
+ cp.mu.Lock()
+ defer cp.mu.Unlock()
+
+ return cp.connected
+}
+
+// watch manages connection state and reconnects if needed
+func (cp *chanPool) watch() {
+ for {
+ select {
+ case <-cp.wait:
+ // connection has been closed
+ return
+ // here we are waiting for the errors from amqp connection
+ case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)):
+ cp.mu.Lock()
+ // clear connected, since connections are dead
+ cp.connected = make(chan interface{})
+
+ // broadcast error to all consume to let them for the tryReconnect
+ for _, ch := range cp.channels {
+ ch.signal <- err
+ }
+
+ // disable channel allocation while server is dead
+ cp.conn = nil
+ cp.channels = nil
+
+ // initialize the backoff
+ expb := backoff.NewExponentialBackOff()
+ expb.MaxInterval = cp.tout
+ cp.mu.Unlock()
+
+ // reconnect function
+ reconnect := func() error {
+ cp.mu.Lock()
+ conn, err := dial(cp.url)
+ if err != nil {
+ // still failing
+ fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error()))
+ cp.mu.Unlock()
+ return err
+ }
+
+ // TODO ADD LOGGING
+ fmt.Println("------amqp successfully redialed------")
+
+ // here we are reconnected
+ // replace the connection
+ cp.conn = conn
+ // re-init the channels
+ cp.channels = make(map[string]*channel)
+ cp.mu.Unlock()
+ return nil
+ }
+
+ // start backoff retry
+ errb := backoff.Retry(reconnect, expb)
+ if errb != nil {
+ fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error()))
+ // reconnection failed
+ close(cp.connected)
+ return
+ }
+ close(cp.connected)
+ }
+ }
+}
+
+// channel allocates new channel on amqp connection
+func (cp *chanPool) channel(name string) (*channel, error) {
+ cp.mu.Lock()
+ dead := cp.conn == nil
+ cp.mu.Unlock()
+
+ if dead {
+ // wait for connection restoration (doubled the timeout duration)
+ select {
+ case <-time.NewTimer(cp.tout * 2).C:
+ return nil, fmt.Errorf("connection is dead")
+ case <-cp.connected:
+ // connected
+ }
+ }
+
+ cp.mu.Lock()
+ defer cp.mu.Unlock()
+
+ if cp.conn == nil {
+ return nil, fmt.Errorf("connection has been closed")
+ }
+
+ if ch, ok := cp.channels[name]; ok {
+ return ch, nil
+ }
+
+ // we must create new channel
+ ch, err := cp.conn.Channel()
+ if err != nil {
+ return nil, err
+ }
+
+ // Enable publish confirmations
+ if err = ch.Confirm(false); err != nil {
+ return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err)
+ }
+
+ // we expect that every allocated channel would have listener on signal
+ // this is not true only in case of pure producing channels
+ cp.channels[name] = &channel{
+ ch: ch,
+ confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)),
+ signal: make(chan error, 1),
+ }
+
+ return cp.channels[name], nil
+}
+
+// closeChan gracefully closes and removes channel allocation.
+func (cp *chanPool) closeChan(c *channel, err error) error {
+ cp.mu.Lock()
+ defer cp.mu.Unlock()
+
+ go func() {
+ c.signal <- nil
+ c.ch.Close()
+ }()
+
+ for name, ch := range cp.channels {
+ if ch == c {
+ delete(cp.channels, name)
+ }
+ }
+
+ return err
+}
diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go
new file mode 100644
index 00000000..28999c36
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/consume_test.go
@@ -0,0 +1,258 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Consume_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_Delayed(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ start := time.Now()
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Delay: 1},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ elapsed := time.Since(start)
+ assert.True(t, elapsed >= time.Second)
+ assert.True(t, elapsed < 3*time.Second)
+}
+
+func TestBroker_Consume_Errored(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ close(errHandled)
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return fmt.Errorf("job failed")
+ }
+
+ <-waitJob
+ <-errHandled
+}
+
+func TestBroker_Consume_Errored_Attempts(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ attempts := 0
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ attempts++
+ errHandled <- nil
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Attempts: 3},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ return fmt.Errorf("job failed")
+ }
+
+ <-errHandled
+ <-errHandled
+ <-errHandled
+ assert.Equal(t, 3, attempts)
+}
diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go
new file mode 100644
index 00000000..00d62c51
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/durability_test.go
@@ -0,0 +1,728 @@
+package amqp
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "net"
+ "sync"
+ "testing"
+ "time"
+)
+
+var (
+ proxyCfg = &Config{
+ Addr: "amqp://guest:guest@localhost:5673/",
+ Timeout: 1,
+ }
+
+ proxy = &tcpProxy{
+ listen: "localhost:5673",
+ upstream: "localhost:5672",
+ accept: true,
+ }
+)
+
+type tcpProxy struct {
+ listen string
+ upstream string
+ mu sync.Mutex
+ accept bool
+ conn []net.Conn
+}
+
+func (p *tcpProxy) serve() {
+ l, err := net.Listen("tcp", p.listen)
+ if err != nil {
+ panic(err)
+ }
+
+ for {
+ in, err := l.Accept()
+ if err != nil {
+ panic(err)
+ }
+
+ if !p.accepting() {
+ in.Close()
+ }
+
+ up, err := net.Dial("tcp", p.upstream)
+ if err != nil {
+ panic(err)
+ }
+
+ go io.Copy(in, up)
+ go io.Copy(up, in)
+
+ p.mu.Lock()
+ p.conn = append(p.conn, in, up)
+ p.mu.Unlock()
+ }
+}
+
+// wait for specific number of connections
+func (p *tcpProxy) waitConn(count int) *tcpProxy {
+ p.mu.Lock()
+ p.accept = true
+ p.mu.Unlock()
+
+ for {
+ p.mu.Lock()
+ current := len(p.conn)
+ p.mu.Unlock()
+
+ if current >= count*2 {
+ break
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+
+ return p
+}
+
+func (p *tcpProxy) reset(accept bool) int {
+ p.mu.Lock()
+ p.accept = accept
+ defer p.mu.Unlock()
+
+ count := 0
+ for _, conn := range p.conn {
+ conn.Close()
+ count++
+ }
+
+ p.conn = nil
+ return count / 2
+}
+
+func (p *tcpProxy) accepting() bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.accept
+}
+
+func init() {
+ go proxy.serve()
+}
+
+func TestBroker_Durability_Base(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ // expect 2 connections
+ proxy.waitConn(2)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Durability_Consume(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ time.Sleep(3 * time.Second)
+ proxy.waitConn(1)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NotEqual(t, "0", jid)
+
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume2(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+ if perr != nil {
+ panic(perr)
+ }
+
+ proxy.reset(true)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume2_2(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // start when connection is dead
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+ if perr != nil {
+ panic(perr)
+ }
+
+ proxy.reset(false)
+
+ _, serr := b.Stat(pipe)
+ assert.Error(t, serr)
+
+ proxy.reset(true)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume3(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(2)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+ if perr != nil {
+ panic(perr)
+ }
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume4(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(2)
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "kill",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+
+ if j.Payload == "kill" && len(done) == 0 {
+ proxy.reset(true)
+ }
+
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 3 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_StopDead(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+
+ <-ready
+
+ proxy.waitConn(2).reset(false)
+
+ b.Stop()
+}
diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go
new file mode 100644
index 00000000..bd559715
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/job.go
@@ -0,0 +1,56 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/streadway/amqp"
+)
+
+// pack job metadata into headers
+func pack(id string, attempt int, j *jobs.Job) amqp.Table {
+ return amqp.Table{
+ "rr-id": id,
+ "rr-job": j.Job,
+ "rr-attempt": int64(attempt),
+ "rr-maxAttempts": int64(j.Options.Attempts),
+ "rr-timeout": int64(j.Options.Timeout),
+ "rr-delay": int64(j.Options.Delay),
+ "rr-retryDelay": int64(j.Options.RetryDelay),
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) {
+ j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}}
+
+ if _, ok := d.Headers["rr-id"].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id")
+ }
+
+ if _, ok := d.Headers["rr-attempt"].(int64); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt")
+ }
+
+ if _, ok := d.Headers["rr-job"].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job")
+ }
+ j.Job = d.Headers["rr-job"].(string)
+
+ if _, ok := d.Headers["rr-maxAttempts"].(int64); ok {
+ j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-timeout"].(int64); ok {
+ j.Options.Timeout = int(d.Headers["rr-timeout"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-delay"].(int64); ok {
+ j.Options.Delay = int(d.Headers["rr-delay"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-retryDelay"].(int64); ok {
+ j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64))
+ }
+
+ return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil
+}
diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go
new file mode 100644
index 00000000..24ca453b
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/job_test.go
@@ -0,0 +1,29 @@
+package amqp
+
+import (
+ "github.com/streadway/amqp"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_Unpack_Errors(t *testing.T) {
+ _, _, _, err := unpack(amqp.Delivery{
+ Headers: map[string]interface{}{},
+ })
+ assert.Error(t, err)
+
+ _, _, _, err = unpack(amqp.Delivery{
+ Headers: map[string]interface{}{
+ "rr-id": "id",
+ },
+ })
+ assert.Error(t, err)
+
+ _, _, _, err = unpack(amqp.Delivery{
+ Headers: map[string]interface{}{
+ "rr-id": "id",
+ "rr-attempt": int64(0),
+ },
+ })
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go
new file mode 100644
index 00000000..6ef5f20f
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/queue.go
@@ -0,0 +1,302 @@
+package amqp
+
+import (
+ "errors"
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/streadway/amqp"
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type ExchangeType string
+
+const (
+ Direct ExchangeType = "direct"
+ Fanout ExchangeType = "fanout"
+ Topic ExchangeType = "topic"
+ Headers ExchangeType = "headers"
+)
+
+func (et ExchangeType) IsValid() error {
+ switch et {
+ case Direct, Fanout, Topic, Headers:
+ return nil
+ }
+ return errors.New("unknown exchange-type")
+}
+
+func (et ExchangeType) String() string {
+ switch et {
+ case Direct, Fanout, Topic, Headers:
+ return string(et)
+ default:
+ return "direct"
+ }
+}
+
+
+type queue struct {
+ active int32
+ pipe *jobs.Pipeline
+ exchange string
+ exchangeType ExchangeType
+ name, key string
+ consumer string
+
+ // active consuming channel
+ muc sync.Mutex
+ cc *channel
+
+ // queue events
+ lsn func(event int, ctx interface{})
+
+ // active operations
+ muw sync.RWMutex
+ wg sync.WaitGroup
+
+ // exec handlers
+ running int32
+ execPool chan jobs.Handler
+ errHandler jobs.ErrorHandler
+}
+
+// newQueue creates new queue wrapper for AMQP.
+func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
+ if pipe.String("queue", "") == "" {
+ return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline")
+ }
+
+ exchangeType := ExchangeType(pipe.String("exchange-type", "direct"))
+
+ err := exchangeType.IsValid()
+ if err != nil {
+ return nil, fmt.Errorf(err.Error())
+ }
+
+ return &queue{
+ exchange: pipe.String("exchange", "amqp.direct"),
+ exchangeType: exchangeType,
+ name: pipe.String("queue", ""),
+ key: pipe.String("routing-key", pipe.String("queue", "")),
+ consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())),
+ pipe: pipe,
+ lsn: lsn,
+ }, nil
+}
+
+// serve consumes queue
+func (q *queue) serve(publish, consume *chanPool) {
+ atomic.StoreInt32(&q.active, 1)
+
+ for {
+ <-consume.waitConnected()
+ if atomic.LoadInt32(&q.active) == 0 {
+ // stopped
+ return
+ }
+
+ delivery, cc, err := q.consume(consume)
+ if err != nil {
+ q.report(err)
+ continue
+ }
+
+ q.muc.Lock()
+ q.cc = cc
+ q.muc.Unlock()
+
+ for d := range delivery {
+ q.muw.Lock()
+ q.wg.Add(1)
+ q.muw.Unlock()
+
+ atomic.AddInt32(&q.running, 1)
+ h := <-q.execPool
+
+ go func(h jobs.Handler, d amqp.Delivery) {
+ err := q.do(publish, h, d)
+
+ atomic.AddInt32(&q.running, ^int32(0))
+ q.execPool <- h
+ q.wg.Done()
+ q.report(err)
+ }(h, d)
+ }
+ }
+}
+
+func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) {
+ // allocate channel for the consuming
+ if cc, err = consume.channel(q.name); err != nil {
+ return nil, nil, err
+ }
+
+ if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil {
+ return nil, nil, consume.closeChan(cc, err)
+ }
+
+ delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil)
+ if err != nil {
+ return nil, nil, consume.closeChan(cc, err)
+ }
+
+ // do i like it?
+ go func(consume *chanPool) {
+ for err := range cc.signal {
+ consume.closeChan(cc, err)
+ return
+ }
+ }(consume)
+
+ return delivery, cc, err
+}
+
+func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error {
+ id, attempt, j, err := unpack(d)
+ if err != nil {
+ q.report(err)
+ return d.Nack(false, false)
+ }
+ err = h(id, j)
+
+ if err == nil {
+ return d.Ack(false)
+ }
+
+ // failed
+ q.errHandler(id, j, err)
+
+ if !j.Options.CanRetry(attempt) {
+ return d.Nack(false, false)
+ }
+
+ // retry as new j (to accommodate attempt number and new delay)
+ if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil {
+ q.report(err)
+ return d.Nack(false, true)
+ }
+
+ return d.Ack(false)
+}
+
+func (q *queue) stop() {
+ if atomic.LoadInt32(&q.active) == 0 {
+ return
+ }
+
+ atomic.StoreInt32(&q.active, 0)
+
+ q.muc.Lock()
+ if q.cc != nil {
+ // gracefully stopped consuming
+ q.report(q.cc.ch.Cancel(q.consumer, true))
+ }
+ q.muc.Unlock()
+
+ q.muw.Lock()
+ q.wg.Wait()
+ q.muw.Unlock()
+}
+
+// publish message to queue or to delayed queue.
+func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error {
+ c, err := cp.channel(q.name)
+ if err != nil {
+ return err
+ }
+
+ qKey := q.key
+
+ if delay != 0 {
+ delayMs := int64(delay.Seconds() * 1000)
+ qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name)
+ qKey = qName
+
+ err := q.declare(cp, qName, qName, amqp.Table{
+ "x-dead-letter-exchange": q.exchange,
+ "x-dead-letter-routing-key": q.name,
+ "x-message-ttl": delayMs,
+ "x-expires": delayMs * 2,
+ })
+
+ if err != nil {
+ return err
+ }
+ }
+
+ err = c.ch.Publish(
+ q.exchange, // exchange
+ qKey, // routing key
+ false, // mandatory
+ false, // immediate
+ amqp.Publishing{
+ ContentType: "application/octet-stream",
+ Body: j.Body(),
+ DeliveryMode: amqp.Persistent,
+ Headers: pack(id, attempt, j),
+ },
+ )
+
+ if err != nil {
+ return cp.closeChan(c, err)
+ }
+
+ confirmed, ok := <-c.confirm
+ if ok && confirmed.Ack {
+ return nil
+ }
+
+ return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag)
+}
+
+// declare queue and binding to it
+func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error {
+ c, err := cp.channel(q.name)
+ if err != nil {
+ return err
+ }
+
+ err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil)
+ if err != nil {
+ return cp.closeChan(c, err)
+ }
+
+ _, err = c.ch.QueueDeclare(queue, true, false, false, false, args)
+ if err != nil {
+ return cp.closeChan(c, err)
+ }
+
+ err = c.ch.QueueBind(queue, key, q.exchange, false, nil)
+ if err != nil {
+ return cp.closeChan(c, err)
+ }
+
+ // keep channel open
+ return err
+}
+
+// inspect the queue
+func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) {
+ c, err := cp.channel("stat")
+ if err != nil {
+ return nil, err
+ }
+
+ queue, err := c.ch.QueueInspect(q.name)
+ if err != nil {
+ return nil, cp.closeChan(c, err)
+ }
+
+ // keep channel open
+ return &queue, err
+}
+
+// throw handles service, server and pool events.
+func (q *queue) report(err error) {
+ if err != nil {
+ q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
+ }
+}
diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go
new file mode 100644
index 00000000..ef19746c
--- /dev/null
+++ b/plugins/jobs/oooold/broker/amqp/stat_test.go
@@ -0,0 +1,63 @@
+package amqp
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "sync"
+ "testing"
+)
+
+func TestBroker_Stat(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ exec <- func(id string, j *jobs.Job) error {
+ defer wg.Done()
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Active)
+
+ return nil
+ }
+
+ wg.Wait()
+ stat, err = b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+}