diff options
author | Valery Piashchynski <[email protected]> | 2021-06-16 12:56:02 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-16 12:56:02 +0300 |
commit | cee4bc46097506d6e892b6af194751434700621a (patch) | |
tree | e542d1b2f963c2aa0e304703c82ff4f04203b169 /plugins/jobs/oooold/broker/beanstalk/consume_test.go | |
parent | d4c92e48bada7593b6fbec612a742c599de6e736 (diff) |
- Update jobs sources
- Update Arch diagramm
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker/beanstalk/consume_test.go')
-rw-r--r-- | plugins/jobs/oooold/broker/beanstalk/consume_test.go | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go new file mode 100644 index 00000000..b16866ae --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/consume_test.go @@ -0,0 +1,242 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed >= time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} |