diff options
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp')
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/broker.go | 216 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/broker_test.go | 419 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/conn.go | 232 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/consume_test.go | 258 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/durability_test.go | 728 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/job.go | 56 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/job_test.go | 29 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/queue.go | 302 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/stat_test.go | 63 |
9 files changed, 0 insertions, 2303 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go deleted file mode 100644 index b47d83ee..00000000 --- a/plugins/jobs/oooold/broker/amqp/broker.go +++ /dev/null @@ -1,216 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/gofrs/uuid" - "github.com/spiral/jobs/v2" - "sync" - "sync/atomic" -) - -// Broker represents AMQP broker. -type Broker struct { - cfg *Config - lsn func(event int, ctx interface{}) - publish *chanPool - consume *chanPool - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures AMQP job broker (always 2 connections). -func (b *Broker) Init(cfg *Config) (ok bool, err error) { - b.cfg = cfg - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - q, err := newQueue(pipe, b.throw) - if err != nil { - return err - } - - b.queues[pipe] = q - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { - b.mu.Unlock() - return err - } - defer b.publish.Close() - - if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { - b.mu.Unlock() - return err - } - defer b.consume.Close() - - for _, q := range b.queues { - err := q.declare(b.publish, q.name, q.key, nil) - if err != nil { - b.mu.Unlock() - return err - } - } - - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve(b.publish, b.consume) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, q := range b.queues { - q.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.publish != nil && q.execPool != nil { - if q.execPool != nil { - go q.serve(b.publish, b.consume) - } - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - id, err := uuid.NewV4() - if err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil { - return "", err - } - - return id.String(), nil -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - queue, err := q.inspect(b.publish) - if err != nil { - return nil, err - } - - // this the closest approximation we can get for now - return &jobs.Stat{ - InternalName: queue.Name, - Queue: int64(queue.Messages), - Active: int64(atomic.LoadInt32(&q.running)), - }, nil -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go deleted file mode 100644 index 66078099..00000000 --- a/plugins/jobs/oooold/broker/amqp/broker_test.go +++ /dev/null @@ -1,419 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "amqp", - "name": "default", - "queue": "rr-queue", - "exchange": "rr-exchange", - "prefetch": 1, - } - - cfg = &Config{ - Addr: "amqp://guest:guest@localhost:5672/", - } -) - -var ( - fanoutPipe = &jobs.Pipeline{ - "broker": "amqp", - "name": "fanout", - "queue": "fanout-queue", - "exchange": "fanout-exchange", - "exchange-type": "fanout", - "prefetch": 1, - } - - fanoutCfg = &Config{ - Addr: "amqp://guest:guest@localhost:5672/", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_BadPipeline(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "amqp", - "name": "default", - "exchange": "rr-exchange", - "prefetch": 1, - })) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_CantStart(t *testing.T) { - b := &Broker{} - _, err := b.Init(&Config{ - Addr: "amqp://guest:guest@localhost:15672/", - }) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Serve()) -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal() - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_Queue_RoutingKey(t *testing.T) { - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key") -} - -func TestBroker_Register_With_RoutingKey(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - assert.NoError(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Consume_With_RoutingKey(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") - - err = b.Register(&pipeWithKey) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(&pipeWithKey, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Queue_ExchangeType(t *testing.T) { - pipeWithKey := pipe.With("exchange-type", "direct") - - assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct") -} - -func TestBroker_Register_With_ExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("exchange-type", "fanout") - - assert.NoError(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Register_With_WrongExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := pipe.With("exchange-type", "xxx") - - assert.Error(t, b.Register(&pipeWithKey)) -} - -func TestBroker_Consume_With_ExchangeType(t *testing.T) { - b := &Broker{} - _, err := b.Init(fanoutCfg) - if err != nil { - t.Fatal(err) - } - - pipeWithKey := fanoutPipe.With("exchange-type", "fanout") - - err = b.Register(&pipeWithKey) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(&pipeWithKey, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go deleted file mode 100644 index be747776..00000000 --- a/plugins/jobs/oooold/broker/amqp/conn.go +++ /dev/null @@ -1,232 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/cenkalti/backoff/v4" - "github.com/streadway/amqp" - "sync" - "time" -) - -// manages set of AMQP channels -type chanPool struct { - // timeout to backoff redial - tout time.Duration - url string - - mu *sync.Mutex - - conn *amqp.Connection - channels map[string]*channel - wait chan interface{} - connected chan interface{} -} - -// manages single channel -type channel struct { - ch *amqp.Channel - // todo unused - //consumer string - confirm chan amqp.Confirmation - signal chan error -} - -// newConn creates new watched AMQP connection -func newConn(url string, tout time.Duration) (*chanPool, error) { - conn, err := dial(url) - if err != nil { - return nil, err - } - - cp := &chanPool{ - url: url, - tout: tout, - conn: conn, - mu: &sync.Mutex{}, - channels: make(map[string]*channel), - wait: make(chan interface{}), - connected: make(chan interface{}), - } - - close(cp.connected) - go cp.watch() - return cp, nil -} - -// dial dials to AMQP. -func dial(url string) (*amqp.Connection, error) { - return amqp.Dial(url) -} - -// Close gracefully closes all underlying channels and connection. -func (cp *chanPool) Close() error { - cp.mu.Lock() - - close(cp.wait) - if cp.channels == nil { - return fmt.Errorf("connection is dead") - } - - // close all channels and consume - var wg sync.WaitGroup - for _, ch := range cp.channels { - wg.Add(1) - - go func(ch *channel) { - defer wg.Done() - cp.closeChan(ch, nil) - }(ch) - } - cp.mu.Unlock() - - wg.Wait() - - cp.mu.Lock() - defer cp.mu.Unlock() - - if cp.conn != nil { - return cp.conn.Close() - } - - return nil -} - -// waitConnected waits till connection is connected again or eventually closed. -// must only be invoked after connection error has been delivered to channel.signal. -func (cp *chanPool) waitConnected() chan interface{} { - cp.mu.Lock() - defer cp.mu.Unlock() - - return cp.connected -} - -// watch manages connection state and reconnects if needed -func (cp *chanPool) watch() { - for { - select { - case <-cp.wait: - // connection has been closed - return - // here we are waiting for the errors from amqp connection - case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)): - cp.mu.Lock() - // clear connected, since connections are dead - cp.connected = make(chan interface{}) - - // broadcast error to all consume to let them for the tryReconnect - for _, ch := range cp.channels { - ch.signal <- err - } - - // disable channel allocation while server is dead - cp.conn = nil - cp.channels = nil - - // initialize the backoff - expb := backoff.NewExponentialBackOff() - expb.MaxInterval = cp.tout - cp.mu.Unlock() - - // reconnect function - reconnect := func() error { - cp.mu.Lock() - conn, err := dial(cp.url) - if err != nil { - // still failing - fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error())) - cp.mu.Unlock() - return err - } - - // TODO ADD LOGGING - fmt.Println("------amqp successfully redialed------") - - // here we are reconnected - // replace the connection - cp.conn = conn - // re-init the channels - cp.channels = make(map[string]*channel) - cp.mu.Unlock() - return nil - } - - // start backoff retry - errb := backoff.Retry(reconnect, expb) - if errb != nil { - fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error())) - // reconnection failed - close(cp.connected) - return - } - close(cp.connected) - } - } -} - -// channel allocates new channel on amqp connection -func (cp *chanPool) channel(name string) (*channel, error) { - cp.mu.Lock() - dead := cp.conn == nil - cp.mu.Unlock() - - if dead { - // wait for connection restoration (doubled the timeout duration) - select { - case <-time.NewTimer(cp.tout * 2).C: - return nil, fmt.Errorf("connection is dead") - case <-cp.connected: - // connected - } - } - - cp.mu.Lock() - defer cp.mu.Unlock() - - if cp.conn == nil { - return nil, fmt.Errorf("connection has been closed") - } - - if ch, ok := cp.channels[name]; ok { - return ch, nil - } - - // we must create new channel - ch, err := cp.conn.Channel() - if err != nil { - return nil, err - } - - // Enable publish confirmations - if err = ch.Confirm(false); err != nil { - return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err) - } - - // we expect that every allocated channel would have listener on signal - // this is not true only in case of pure producing channels - cp.channels[name] = &channel{ - ch: ch, - confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)), - signal: make(chan error, 1), - } - - return cp.channels[name], nil -} - -// closeChan gracefully closes and removes channel allocation. -func (cp *chanPool) closeChan(c *channel, err error) error { - cp.mu.Lock() - defer cp.mu.Unlock() - - go func() { - c.signal <- nil - c.ch.Close() - }() - - for name, ch := range cp.channels { - if ch == c { - delete(cp.channels, name) - } - } - - return err -} diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go deleted file mode 100644 index 28999c36..00000000 --- a/plugins/jobs/oooold/broker/amqp/consume_test.go +++ /dev/null @@ -1,258 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - start := time.Now() - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed >= time.Second) - assert.True(t, elapsed < 3*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go deleted file mode 100644 index 00d62c51..00000000 --- a/plugins/jobs/oooold/broker/amqp/durability_test.go +++ /dev/null @@ -1,728 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Addr: "amqp://guest:guest@localhost:5673/", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:5673", - upstream: "localhost:5672", - accept: true, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - // expect 2 connections - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NotEqual(t, "0", jid) - - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - proxy.reset(true) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2_2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - proxy.waitConn(2).reset(false) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // start when connection is dead - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - proxy.reset(false) - - _, serr := b.Stat(pipe) - assert.Error(t, serr) - - proxy.reset(true) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2) - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - if perr != nil { - panic(perr) - } - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - ch, err := b.consume.channel("purger") - if err != nil { - panic(err) - } - _, err = ch.ch.QueuePurge("rr-queue", false) - if err != nil { - panic(err) - } - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - proxy.waitConn(2) - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - if err != nil { - t.Fatal(err) - } - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - - if j.Payload == "kill" && len(done) == 0 { - proxy.reset(true) - } - - mu.Lock() - defer mu.Unlock() - done[id] = true - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 3 { - break - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(2).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go deleted file mode 100644 index bd559715..00000000 --- a/plugins/jobs/oooold/broker/amqp/job.go +++ /dev/null @@ -1,56 +0,0 @@ -package amqp - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/streadway/amqp" -) - -// pack job metadata into headers -func pack(id string, attempt int, j *jobs.Job) amqp.Table { - return amqp.Table{ - "rr-id": id, - "rr-job": j.Job, - "rr-attempt": int64(attempt), - "rr-maxAttempts": int64(j.Options.Attempts), - "rr-timeout": int64(j.Options.Timeout), - "rr-delay": int64(j.Options.Delay), - "rr-retryDelay": int64(j.Options.RetryDelay), - } -} - -// unpack restores jobs.Options -func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) { - j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}} - - if _, ok := d.Headers["rr-id"].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id") - } - - if _, ok := d.Headers["rr-attempt"].(int64); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt") - } - - if _, ok := d.Headers["rr-job"].(string); !ok { - return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job") - } - j.Job = d.Headers["rr-job"].(string) - - if _, ok := d.Headers["rr-maxAttempts"].(int64); ok { - j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64)) - } - - if _, ok := d.Headers["rr-timeout"].(int64); ok { - j.Options.Timeout = int(d.Headers["rr-timeout"].(int64)) - } - - if _, ok := d.Headers["rr-delay"].(int64); ok { - j.Options.Delay = int(d.Headers["rr-delay"].(int64)) - } - - if _, ok := d.Headers["rr-retryDelay"].(int64); ok { - j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64)) - } - - return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil -} diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go deleted file mode 100644 index 24ca453b..00000000 --- a/plugins/jobs/oooold/broker/amqp/job_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package amqp - -import ( - "github.com/streadway/amqp" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_Unpack_Errors(t *testing.T) { - _, _, _, err := unpack(amqp.Delivery{ - Headers: map[string]interface{}{}, - }) - assert.Error(t, err) - - _, _, _, err = unpack(amqp.Delivery{ - Headers: map[string]interface{}{ - "rr-id": "id", - }, - }) - assert.Error(t, err) - - _, _, _, err = unpack(amqp.Delivery{ - Headers: map[string]interface{}{ - "rr-id": "id", - "rr-attempt": int64(0), - }, - }) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go deleted file mode 100644 index 6ef5f20f..00000000 --- a/plugins/jobs/oooold/broker/amqp/queue.go +++ /dev/null @@ -1,302 +0,0 @@ -package amqp - -import ( - "errors" - "fmt" - "github.com/spiral/jobs/v2" - "github.com/streadway/amqp" - "os" - "sync" - "sync/atomic" - "time" -) - -type ExchangeType string - -const ( - Direct ExchangeType = "direct" - Fanout ExchangeType = "fanout" - Topic ExchangeType = "topic" - Headers ExchangeType = "headers" -) - -func (et ExchangeType) IsValid() error { - switch et { - case Direct, Fanout, Topic, Headers: - return nil - } - return errors.New("unknown exchange-type") -} - -func (et ExchangeType) String() string { - switch et { - case Direct, Fanout, Topic, Headers: - return string(et) - default: - return "direct" - } -} - - -type queue struct { - active int32 - pipe *jobs.Pipeline - exchange string - exchangeType ExchangeType - name, key string - consumer string - - // active consuming channel - muc sync.Mutex - cc *channel - - // queue events - lsn func(event int, ctx interface{}) - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - running int32 - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -// newQueue creates new queue wrapper for AMQP. -func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { - if pipe.String("queue", "") == "" { - return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline") - } - - exchangeType := ExchangeType(pipe.String("exchange-type", "direct")) - - err := exchangeType.IsValid() - if err != nil { - return nil, fmt.Errorf(err.Error()) - } - - return &queue{ - exchange: pipe.String("exchange", "amqp.direct"), - exchangeType: exchangeType, - name: pipe.String("queue", ""), - key: pipe.String("routing-key", pipe.String("queue", "")), - consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())), - pipe: pipe, - lsn: lsn, - }, nil -} - -// serve consumes queue -func (q *queue) serve(publish, consume *chanPool) { - atomic.StoreInt32(&q.active, 1) - - for { - <-consume.waitConnected() - if atomic.LoadInt32(&q.active) == 0 { - // stopped - return - } - - delivery, cc, err := q.consume(consume) - if err != nil { - q.report(err) - continue - } - - q.muc.Lock() - q.cc = cc - q.muc.Unlock() - - for d := range delivery { - q.muw.Lock() - q.wg.Add(1) - q.muw.Unlock() - - atomic.AddInt32(&q.running, 1) - h := <-q.execPool - - go func(h jobs.Handler, d amqp.Delivery) { - err := q.do(publish, h, d) - - atomic.AddInt32(&q.running, ^int32(0)) - q.execPool <- h - q.wg.Done() - q.report(err) - }(h, d) - } - } -} - -func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) { - // allocate channel for the consuming - if cc, err = consume.channel(q.name); err != nil { - return nil, nil, err - } - - if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil { - return nil, nil, consume.closeChan(cc, err) - } - - delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil) - if err != nil { - return nil, nil, consume.closeChan(cc, err) - } - - // do i like it? - go func(consume *chanPool) { - for err := range cc.signal { - consume.closeChan(cc, err) - return - } - }(consume) - - return delivery, cc, err -} - -func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error { - id, attempt, j, err := unpack(d) - if err != nil { - q.report(err) - return d.Nack(false, false) - } - err = h(id, j) - - if err == nil { - return d.Ack(false) - } - - // failed - q.errHandler(id, j, err) - - if !j.Options.CanRetry(attempt) { - return d.Nack(false, false) - } - - // retry as new j (to accommodate attempt number and new delay) - if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil { - q.report(err) - return d.Nack(false, true) - } - - return d.Ack(false) -} - -func (q *queue) stop() { - if atomic.LoadInt32(&q.active) == 0 { - return - } - - atomic.StoreInt32(&q.active, 0) - - q.muc.Lock() - if q.cc != nil { - // gracefully stopped consuming - q.report(q.cc.ch.Cancel(q.consumer, true)) - } - q.muc.Unlock() - - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() -} - -// publish message to queue or to delayed queue. -func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error { - c, err := cp.channel(q.name) - if err != nil { - return err - } - - qKey := q.key - - if delay != 0 { - delayMs := int64(delay.Seconds() * 1000) - qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name) - qKey = qName - - err := q.declare(cp, qName, qName, amqp.Table{ - "x-dead-letter-exchange": q.exchange, - "x-dead-letter-routing-key": q.name, - "x-message-ttl": delayMs, - "x-expires": delayMs * 2, - }) - - if err != nil { - return err - } - } - - err = c.ch.Publish( - q.exchange, // exchange - qKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/octet-stream", - Body: j.Body(), - DeliveryMode: amqp.Persistent, - Headers: pack(id, attempt, j), - }, - ) - - if err != nil { - return cp.closeChan(c, err) - } - - confirmed, ok := <-c.confirm - if ok && confirmed.Ack { - return nil - } - - return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag) -} - -// declare queue and binding to it -func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error { - c, err := cp.channel(q.name) - if err != nil { - return err - } - - err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil) - if err != nil { - return cp.closeChan(c, err) - } - - _, err = c.ch.QueueDeclare(queue, true, false, false, false, args) - if err != nil { - return cp.closeChan(c, err) - } - - err = c.ch.QueueBind(queue, key, q.exchange, false, nil) - if err != nil { - return cp.closeChan(c, err) - } - - // keep channel open - return err -} - -// inspect the queue -func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) { - c, err := cp.channel("stat") - if err != nil { - return nil, err - } - - queue, err := c.ch.QueueInspect(q.name) - if err != nil { - return nil, cp.closeChan(c, err) - } - - // keep channel open - return &queue, err -} - -// throw handles service, server and pool events. -func (q *queue) report(err error) { - if err != nil { - q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go deleted file mode 100644 index ef19746c..00000000 --- a/plugins/jobs/oooold/broker/amqp/stat_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package amqp - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "sync" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - b.Register(pipe) - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - wg := &sync.WaitGroup{} - wg.Add(1) - exec <- func(id string, j *jobs.Job) error { - defer wg.Done() - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Active) - - return nil - } - - wg.Wait() - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(0), stat.Active) -} |