diff options
author | Valery Piashchynski <[email protected]> | 2021-06-15 22:12:32 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-15 22:12:32 +0300 |
commit | d4c92e48bada7593b6fbec612a742c599de6e736 (patch) | |
tree | 53b6fb81987953b71a77ae094e579a0a7daa407c /plugins/jobs/broker/sqs | |
parent | 9dc98d43b0c0de3e1e1bd8fdc97c122c7c7c594f (diff) |
- Jobs plugin initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/broker/sqs')
-rw-r--r-- | plugins/jobs/broker/sqs/broker.go | 189 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/broker_test.go | 275 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/config.go | 82 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/config_test.go | 48 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/consume_test.go | 370 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/durability_test.go | 588 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/job.go | 80 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/job_test.go | 19 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/queue.go | 266 | ||||
-rw-r--r-- | plugins/jobs/broker/sqs/stat_test.go | 60 |
10 files changed, 1977 insertions, 0 deletions
diff --git a/plugins/jobs/broker/sqs/broker.go b/plugins/jobs/broker/sqs/broker.go new file mode 100644 index 00000000..8cc62b6b --- /dev/null +++ b/plugins/jobs/broker/sqs/broker.go @@ -0,0 +1,189 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker represents SQS broker. +type Broker struct { + cfg *Config + sqs *sqs.SQS + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures SQS broker. +func (b *Broker) Init(cfg *Config) (ok bool, err error) { + b.cfg = cfg + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + q, err := newQueue(pipe, b.throw) + if err != nil { + return err + } + + b.queues[pipe] = q + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + b.sqs, err = b.cfg.SQS() + if err != nil { + return err + } + + for _, q := range b.queues { + q.url, err = q.declareQueue(b.sqs) + if err != nil { + return err + } + } + + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve(b.sqs, b.cfg.TimeoutDuration()) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, q := range b.queues { + q.stop() + } + + b.wait <- nil + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.sqs != nil && q.execPool != nil { + go q.serve(b.sqs, b.cfg.TimeoutDuration()) + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + if j.Options.Delay > 900 || j.Options.RetryDelay > 900 { + return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name()) + } + + return q.send(b.sqs, j) +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + return q.stat(b.sqs) +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/broker/sqs/broker_test.go b/plugins/jobs/broker/sqs/broker_test.go new file mode 100644 index 00000000..c87b302d --- /dev/null +++ b/plugins/jobs/broker/sqs/broker_test.go @@ -0,0 +1,275 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + "declare": map[string]interface{}{ + "MessageRetentionPeriod": 86400, + }, + } + + cfg = &Config{ + Key: "api-key", + Secret: "api-secret", + Region: "us-west-1", + Endpoint: "http://localhost:9324", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_RegisterInvalid(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "sqs", + "name": "default", + })) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + b.Consume(pipe, exec, errf) + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) { + pipe := &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "invalid", + "declare": map[string]interface{}{ + "VisibilityTimeout": "invalid", + }, + } + + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + b.Consume(pipe, exec, errf) + + assert.Error(t, b.Serve()) +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/sqs/config.go b/plugins/jobs/broker/sqs/config.go new file mode 100644 index 00000000..d0c2f2b2 --- /dev/null +++ b/plugins/jobs/broker/sqs/config.go @@ -0,0 +1,82 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/roadrunner/service" + "time" +) + +// Config defines sqs broker configuration. +type Config struct { + // Region defined SQS region, not required when endpoint is not empty. + Region string + + // Region defined AWS API key, not required when endpoint is not empty. + Key string + + // Region defined AWS API secret, not required when endpoint is not empty. + Secret string + + // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development. + Endpoint string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Region == "" { + return fmt.Errorf("SQS region is missing") + } + + if c.Key == "" { + return fmt.Errorf("SQS key is missing") + } + + if c.Secret == "" { + return fmt.Errorf("SQS secret is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to allocate the connection. +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} + +// Session returns new AWS session. +func (c *Config) Session() (*session.Session, error) { + return session.NewSession(&aws.Config{ + Region: aws.String(c.Region), + Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""), + }) +} + +// SQS returns new SQS instance or error. +func (c *Config) SQS() (*sqs.SQS, error) { + sess, err := c.Session() + if err != nil { + return nil, err + } + + if c.Endpoint == "" { + return sqs.New(sess), nil + } + + return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil +} diff --git a/plugins/jobs/broker/sqs/config_test.go b/plugins/jobs/broker/sqs/config_test.go new file mode 100644 index 00000000..b36b3c6f --- /dev/null +++ b/plugins/jobs/broker/sqs/config_test.go @@ -0,0 +1,48 @@ +package sqs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error3(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error4(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error5(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} diff --git a/plugins/jobs/broker/sqs/consume_test.go b/plugins/jobs/broker/sqs/consume_test.go new file mode 100644 index 00000000..434fc6ea --- /dev/null +++ b/plugins/jobs/broker/sqs/consume_test.go @@ -0,0 +1,370 @@ +package sqs + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) { + pipe := &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + } + + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_PushTooBigDelay(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{ + Delay: 901, + }, + }) + + assert.Error(t, perr) +} + +func TestBroker_Consume_PushTooBigDelay2(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{ + RetryDelay: 901, + }, + }) + + assert.Error(t, perr) +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed > time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled + b.Stop() +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/broker/sqs/durability_test.go b/plugins/jobs/broker/sqs/durability_test.go new file mode 100644 index 00000000..58ddf4b9 --- /dev/null +++ b/plugins/jobs/broker/sqs/durability_test.go @@ -0,0 +1,588 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Key: "api-key", + Secret: "api-secret", + Region: "us-west-1", + Endpoint: "http://localhost:9325", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:9325", + upstream: "localhost:9324", + accept: true, + } + + proxyPipe = &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + "lockReserved": 1, + "declare": map[string]interface{}{ + "MessageRetentionPeriod": 86400, + }, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + // expect 2 connections + proxy.waitConn(1) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(1) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + proxy.reset(false) + + _, serr = b.Stat(proxyPipe) + assert.Error(t, serr) + + proxy.reset(true) + + _, serr = b.Stat(proxyPipe) + assert.NoError(t, serr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1) + + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(3), st.Queue+st.Active) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + if j.Payload == "kill" { + proxy.reset(true) + } + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 3 { + break + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(1).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/broker/sqs/job.go b/plugins/jobs/broker/sqs/job.go new file mode 100644 index 00000000..50e2c164 --- /dev/null +++ b/plugins/jobs/broker/sqs/job.go @@ -0,0 +1,80 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "strconv" + "time" +) + +var jobAttributes = []*string{ + aws.String("rr-job"), + aws.String("rr-maxAttempts"), + aws.String("rr-delay"), + aws.String("rr-timeout"), + aws.String("rr-retryDelay"), +} + +// pack job metadata into headers +func pack(url *string, j *jobs.Job) *sqs.SendMessageInput { + return &sqs.SendMessageInput{ + QueueUrl: url, + DelaySeconds: aws.Int64(int64(j.Options.Delay)), + MessageBody: aws.String(j.Payload), + MessageAttributes: map[string]*sqs.MessageAttributeValue{ + "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)}, + "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)}, + "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())}, + "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())}, + "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())}, + }, + } +} + +// unpack restores jobs.Options +func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) { + if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok { + return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount") + } + attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"]) + + for _, attr := range jobAttributes { + if _, ok := msg.MessageAttributes[*attr]; !ok { + return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr) + } + } + + j = &jobs.Job{ + Job: *msg.MessageAttributes["rr-job"].StringValue, + Payload: *msg.Body, + Options: &jobs.Options{}, + } + + if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil { + j.Options.Delay = delay + } + + if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil { + j.Options.Attempts = maxAttempts + } + + if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil { + j.Options.Timeout = timeout + } + + if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil { + j.Options.RetryDelay = retryDelay + } + + return *msg.MessageId, attempt - 1, j, nil +} + +func awsString(n int) *string { + return aws.String(strconv.Itoa(n)) +} + +func awsDuration(d time.Duration) *string { + return aws.String(strconv.Itoa(int(d.Seconds()))) +} diff --git a/plugins/jobs/broker/sqs/job_test.go b/plugins/jobs/broker/sqs/job_test.go new file mode 100644 index 00000000..a120af53 --- /dev/null +++ b/plugins/jobs/broker/sqs/job_test.go @@ -0,0 +1,19 @@ +package sqs + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Unpack(t *testing.T) { + msg := &sqs.Message{ + Body: aws.String("body"), + Attributes: map[string]*string{}, + MessageAttributes: map[string]*sqs.MessageAttributeValue{}, + } + + _, _, _, err := unpack(msg) + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/sqs/queue.go b/plugins/jobs/broker/sqs/queue.go new file mode 100644 index 00000000..8a92448e --- /dev/null +++ b/plugins/jobs/broker/sqs/queue.go @@ -0,0 +1,266 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type queue struct { + active int32 + pipe *jobs.Pipeline + url *string + reserve time.Duration + lockReserved time.Duration + + // queue events + lsn func(event int, ctx interface{}) + + // stop channel + wait chan interface{} + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { + if pipe.String("queue", "") == "" { + return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name()) + } + + return &queue{ + pipe: pipe, + reserve: pipe.Duration("reserve", time.Second), + lockReserved: pipe.Duration("lockReserved", 300*time.Second), + lsn: lsn, + }, nil +} + +// declareQueue declared queue +func (q *queue) declareQueue(s *sqs.SQS) (*string, error) { + attr := make(map[string]*string) + for k, v := range q.pipe.Map("declare") { + if vs, ok := v.(string); ok { + attr[k] = aws.String(vs) + } + + if vi, ok := v.(int); ok { + attr[k] = aws.String(strconv.Itoa(vi)) + } + + if vb, ok := v.(bool); ok { + if vb { + attr[k] = aws.String("true") + } else { + attr[k] = aws.String("false") + } + } + } + + if len(attr) != 0 { + r, err := s.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(q.pipe.String("queue", "")), + Attributes: attr, + }) + + return r.QueueUrl, err + } + + // no need to create (get existed) + r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))}) + if err != nil { + return nil, err + } + + return r.QueueUrl, nil +} + +// serve consumers +func (q *queue) serve(s *sqs.SQS, tout time.Duration) { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.active, 1) + + var errored bool + for { + messages, stop, err := q.consume(s) + if err != nil { + if errored { + // reoccurring error + time.Sleep(tout) + } else { + errored = true + q.report(err) + } + + continue + } + errored = false + + if stop { + return + } + + for _, msg := range messages { + h := <-q.execPool + go func(h jobs.Handler, msg *sqs.Message) { + err := q.do(s, h, msg) + q.execPool <- h + q.wg.Done() + q.report(err) + }(h, msg) + } + } +} + +// consume and allocate connection. +func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil, true, nil + default: + r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: q.url, + MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))), + WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())), + VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())), + AttributeNames: []*string{aws.String("ApproximateReceiveCount")}, + MessageAttributeNames: jobAttributes, + }) + if err != nil { + return nil, false, err + } + + q.wg.Add(len(r.Messages)) + + return r.Messages, false, nil + } +} + +// do single message +func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) { + id, attempt, j, err := unpack(msg) + if err != nil { + go q.deleteMessage(s, msg, err) + return err + } + + // block the job based on known timeout + _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + QueueUrl: q.url, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())), + }) + if err != nil { + go q.deleteMessage(s, msg, err) + return err + } + + err = h(id, j) + if err == nil { + return q.deleteMessage(s, msg, nil) + } + + q.errHandler(id, j, err) + + if !j.Options.CanRetry(attempt) { + return q.deleteMessage(s, msg, err) + } + + // retry after specified duration + _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + QueueUrl: q.url, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)), + }) + + return err +} + +func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error { + _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle}) + return drr +} + +// stop the queue consuming +func (q *queue) stop() { + if atomic.LoadInt32(&q.active) == 0 { + return + } + + atomic.StoreInt32(&q.active, 0) + + close(q.wait) + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() +} + +// add job to the queue +func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) { + r, err := s.SendMessage(pack(q.url, j)) + if err != nil { + return "", err + } + + return *r.MessageId, nil +} + +// return queue stats +func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) { + r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{ + QueueUrl: q.url, + AttributeNames: []*string{ + aws.String("ApproximateNumberOfMessages"), + aws.String("ApproximateNumberOfMessagesDelayed"), + aws.String("ApproximateNumberOfMessagesNotVisible"), + }, + }) + + if err != nil { + return nil, err + } + + stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")} + + for a, v := range r.Attributes { + if a == "ApproximateNumberOfMessages" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Queue = int64(v) + } + } + + if a == "ApproximateNumberOfMessagesNotVisible" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Active = int64(v) + } + } + + if a == "ApproximateNumberOfMessagesDelayed" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Delayed = int64(v) + } + } + } + + return stat, nil +} + +// throw handles service, server and pool events. +func (q *queue) report(err error) { + if err != nil { + q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/broker/sqs/stat_test.go b/plugins/jobs/broker/sqs/stat_test.go new file mode 100644 index 00000000..5031571b --- /dev/null +++ b/plugins/jobs/broker/sqs/stat_test.go @@ -0,0 +1,60 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + // unable to use approximated stats + _, err = b.Stat(pipe) + assert.NoError(t, err) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + _, err := b.Stat(pipe) + assert.NoError(t, err) + + close(waitJob) + return nil + } + + <-waitJob + _, err = b.Stat(pipe) + assert.NoError(t, err) +} |