summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/amqp/broker_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp/broker_test.go')
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker_test.go419
1 files changed, 0 insertions, 419 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go
deleted file mode 100644
index 66078099..00000000
--- a/plugins/jobs/oooold/broker/amqp/broker_test.go
+++ /dev/null
@@ -1,419 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "amqp",
- "name": "default",
- "queue": "rr-queue",
- "exchange": "rr-exchange",
- "prefetch": 1,
- }
-
- cfg = &Config{
- Addr: "amqp://guest:guest@localhost:5672/",
- }
-)
-
-var (
- fanoutPipe = &jobs.Pipeline{
- "broker": "amqp",
- "name": "fanout",
- "queue": "fanout-queue",
- "exchange": "fanout-exchange",
- "exchange-type": "fanout",
- "prefetch": 1,
- }
-
- fanoutCfg = &Config{
- Addr: "amqp://guest:guest@localhost:5672/",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_BadPipeline(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "amqp",
- "name": "default",
- "exchange": "rr-exchange",
- "prefetch": 1,
- }))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_CantStart(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(&Config{
- Addr: "amqp://guest:guest@localhost:15672/",
- })
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal()
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_Queue_RoutingKey(t *testing.T) {
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key")
-}
-
-func TestBroker_Register_With_RoutingKey(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- assert.NoError(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Consume_With_RoutingKey(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- err = b.Register(&pipeWithKey)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(&pipeWithKey, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Queue_ExchangeType(t *testing.T) {
- pipeWithKey := pipe.With("exchange-type", "direct")
-
- assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct")
-}
-
-func TestBroker_Register_With_ExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("exchange-type", "fanout")
-
- assert.NoError(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Register_With_WrongExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("exchange-type", "xxx")
-
- assert.Error(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Consume_With_ExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(fanoutCfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := fanoutPipe.With("exchange-type", "fanout")
-
- err = b.Register(&pipeWithKey)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(&pipeWithKey, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}