diff options
Diffstat (limited to 'plugins/jobs/oooold/broker/amqp/broker_test.go')
-rw-r--r-- | plugins/jobs/oooold/broker/amqp/broker_test.go | 419 |
1 files changed, 419 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go new file mode 100644 index 00000000..66078099 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/broker_test.go @@ -0,0 +1,419 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "amqp", + "name": "default", + "queue": "rr-queue", + "exchange": "rr-exchange", + "prefetch": 1, + } + + cfg = &Config{ + Addr: "amqp://guest:guest@localhost:5672/", + } +) + +var ( + fanoutPipe = &jobs.Pipeline{ + "broker": "amqp", + "name": "fanout", + "queue": "fanout-queue", + "exchange": "fanout-exchange", + "exchange-type": "fanout", + "prefetch": 1, + } + + fanoutCfg = &Config{ + Addr: "amqp://guest:guest@localhost:5672/", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_BadPipeline(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "amqp", + "name": "default", + "exchange": "rr-exchange", + "prefetch": 1, + })) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_CantStart(t *testing.T) { + b := &Broker{} + _, err := b.Init(&Config{ + Addr: "amqp://guest:guest@localhost:15672/", + }) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Serve()) +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal() + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_Queue_RoutingKey(t *testing.T) { + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key") +} + +func TestBroker_Register_With_RoutingKey(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + assert.NoError(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Consume_With_RoutingKey(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + err = b.Register(&pipeWithKey) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(&pipeWithKey, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Queue_ExchangeType(t *testing.T) { + pipeWithKey := pipe.With("exchange-type", "direct") + + assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct") +} + +func TestBroker_Register_With_ExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("exchange-type", "fanout") + + assert.NoError(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Register_With_WrongExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("exchange-type", "xxx") + + assert.Error(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Consume_With_ExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(fanoutCfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := fanoutPipe.With("exchange-type", "fanout") + + err = b.Register(&pipeWithKey) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(&pipeWithKey, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} |