summaryrefslogtreecommitdiff
path: root/plugins/jobs/broker/sqs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/broker/sqs')
-rw-r--r--plugins/jobs/broker/sqs/broker.go189
-rw-r--r--plugins/jobs/broker/sqs/broker_test.go275
-rw-r--r--plugins/jobs/broker/sqs/config.go82
-rw-r--r--plugins/jobs/broker/sqs/config_test.go48
-rw-r--r--plugins/jobs/broker/sqs/consume_test.go370
-rw-r--r--plugins/jobs/broker/sqs/durability_test.go588
-rw-r--r--plugins/jobs/broker/sqs/job.go80
-rw-r--r--plugins/jobs/broker/sqs/job_test.go19
-rw-r--r--plugins/jobs/broker/sqs/queue.go266
-rw-r--r--plugins/jobs/broker/sqs/stat_test.go60
10 files changed, 1977 insertions, 0 deletions
diff --git a/plugins/jobs/broker/sqs/broker.go b/plugins/jobs/broker/sqs/broker.go
new file mode 100644
index 00000000..8cc62b6b
--- /dev/null
+++ b/plugins/jobs/broker/sqs/broker.go
@@ -0,0 +1,189 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/jobs/v2"
+ "sync"
+)
+
+// Broker represents SQS broker.
+type Broker struct {
+ cfg *Config
+ sqs *sqs.SQS
+ lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ queues map[*jobs.Pipeline]*queue
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures SQS broker.
+func (b *Broker) Init(cfg *Config) (ok bool, err error) {
+ b.cfg = cfg
+ b.queues = make(map[*jobs.Pipeline]*queue)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.queues[pipe]; ok {
+ return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
+ }
+
+ q, err := newQueue(pipe, b.throw)
+ if err != nil {
+ return err
+ }
+
+ b.queues[pipe] = q
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() (err error) {
+ b.mu.Lock()
+
+ b.sqs, err = b.cfg.SQS()
+ if err != nil {
+ return err
+ }
+
+ for _, q := range b.queues {
+ q.url, err = q.declareQueue(b.sqs)
+ if err != nil {
+ return err
+ }
+ }
+
+ for _, q := range b.queues {
+ qq := q
+ if qq.execPool != nil {
+ go qq.serve(b.sqs, b.cfg.TimeoutDuration())
+ }
+ }
+
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ for _, q := range b.queues {
+ q.stop()
+ }
+
+ b.wait <- nil
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ q.stop()
+
+ q.execPool = execPool
+ q.errHandler = errHandler
+
+ if b.sqs != nil && q.execPool != nil {
+ go q.serve(b.sqs, b.cfg.TimeoutDuration())
+ }
+
+ return nil
+}
+
+// Push job into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ if j.Options.Delay > 900 || j.Options.RetryDelay > 900 {
+ return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name())
+ }
+
+ return q.send(b.sqs, j)
+}
+
+// Stat must fetch statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ return q.stat(b.sqs)
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return nil
+ }
+
+ return q
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
diff --git a/plugins/jobs/broker/sqs/broker_test.go b/plugins/jobs/broker/sqs/broker_test.go
new file mode 100644
index 00000000..c87b302d
--- /dev/null
+++ b/plugins/jobs/broker/sqs/broker_test.go
@@ -0,0 +1,275 @@
+package sqs
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+var (
+ pipe = &jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ "queue": "test",
+ "declare": map[string]interface{}{
+ "MessageRetentionPeriod": 86400,
+ },
+ }
+
+ cfg = &Config{
+ Key: "api-key",
+ Secret: "api-secret",
+ Region: "us-west-1",
+ Endpoint: "http://localhost:9324",
+ }
+)
+
+func TestBroker_Init(t *testing.T) {
+ b := &Broker{}
+ ok, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.True(t, ok)
+ assert.NoError(t, err)
+}
+
+func TestBroker_StopNotStarted(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ b.Stop()
+}
+
+func TestBroker_Register(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+}
+
+func TestBroker_RegisterInvalid(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Error(t, b.Register(&jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ }))
+}
+
+func TestBroker_Register_Twice(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+ assert.Error(t, b.Register(pipe))
+}
+
+func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_Undefined(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ assert.NoError(t, b.Consume(pipe, exec, errf))
+}
+
+func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = b.Consume(pipe, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_Serve_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ b.Consume(pipe, exec, errf)
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) {
+ pipe := &jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ "queue": "invalid",
+ "declare": map[string]interface{}{
+ "VisibilityTimeout": "invalid",
+ },
+ }
+
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ b.Consume(pipe, exec, errf)
+
+ assert.Error(t, b.Serve())
+}
+
+func TestBroker_PushToNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_PushToNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/sqs/config.go b/plugins/jobs/broker/sqs/config.go
new file mode 100644
index 00000000..d0c2f2b2
--- /dev/null
+++ b/plugins/jobs/broker/sqs/config.go
@@ -0,0 +1,82 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Config defines sqs broker configuration.
+type Config struct {
+ // Region defined SQS region, not required when endpoint is not empty.
+ Region string
+
+ // Region defined AWS API key, not required when endpoint is not empty.
+ Key string
+
+ // Region defined AWS API secret, not required when endpoint is not empty.
+ Secret string
+
+ // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development.
+ Endpoint string
+
+ // Timeout to allocate the connection. Default 10 seconds.
+ Timeout int
+}
+
+// Hydrate config values.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ if c.Region == "" {
+ return fmt.Errorf("SQS region is missing")
+ }
+
+ if c.Key == "" {
+ return fmt.Errorf("SQS key is missing")
+ }
+
+ if c.Secret == "" {
+ return fmt.Errorf("SQS secret is missing")
+ }
+
+ return nil
+}
+
+// TimeoutDuration returns number of seconds allowed to allocate the connection.
+func (c *Config) TimeoutDuration() time.Duration {
+ timeout := c.Timeout
+ if timeout == 0 {
+ timeout = 10
+ }
+
+ return time.Duration(timeout) * time.Second
+}
+
+// Session returns new AWS session.
+func (c *Config) Session() (*session.Session, error) {
+ return session.NewSession(&aws.Config{
+ Region: aws.String(c.Region),
+ Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""),
+ })
+}
+
+// SQS returns new SQS instance or error.
+func (c *Config) SQS() (*sqs.SQS, error) {
+ sess, err := c.Session()
+ if err != nil {
+ return nil, err
+ }
+
+ if c.Endpoint == "" {
+ return sqs.New(sess), nil
+ }
+
+ return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil
+}
diff --git a/plugins/jobs/broker/sqs/config_test.go b/plugins/jobs/broker/sqs/config_test.go
new file mode 100644
index 00000000..b36b3c6f
--- /dev/null
+++ b/plugins/jobs/broker/sqs/config_test.go
@@ -0,0 +1,48 @@
+package sqs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{`{"dead`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error3(t *testing.T) {
+ cfg := &mockCfg{`{"region":"us-east-1"}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error4(t *testing.T) {
+ cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error5(t *testing.T) {
+ cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
diff --git a/plugins/jobs/broker/sqs/consume_test.go b/plugins/jobs/broker/sqs/consume_test.go
new file mode 100644
index 00000000..434fc6ea
--- /dev/null
+++ b/plugins/jobs/broker/sqs/consume_test.go
@@ -0,0 +1,370 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Consume_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) {
+ pipe := &jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ "queue": "test",
+ }
+
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_PushTooBigDelay(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{
+ Delay: 901,
+ },
+ })
+
+ assert.Error(t, perr)
+}
+
+func TestBroker_Consume_PushTooBigDelay2(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{
+ RetryDelay: 901,
+ },
+ })
+
+ assert.Error(t, perr)
+}
+
+func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_Delayed(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Delay: 1},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ start := time.Now()
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ elapsed := time.Since(start)
+ assert.True(t, elapsed > time.Second)
+ assert.True(t, elapsed < 2*time.Second)
+}
+
+func TestBroker_Consume_Errored(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ close(errHandled)
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return fmt.Errorf("job failed")
+ }
+
+ <-waitJob
+ <-errHandled
+ b.Stop()
+}
+
+func TestBroker_Consume_Errored_Attempts(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ attempts := 0
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ attempts++
+ errHandled <- nil
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Attempts: 3},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ return fmt.Errorf("job failed")
+ }
+
+ <-errHandled
+ <-errHandled
+ <-errHandled
+ assert.Equal(t, 3, attempts)
+}
diff --git a/plugins/jobs/broker/sqs/durability_test.go b/plugins/jobs/broker/sqs/durability_test.go
new file mode 100644
index 00000000..58ddf4b9
--- /dev/null
+++ b/plugins/jobs/broker/sqs/durability_test.go
@@ -0,0 +1,588 @@
+package sqs
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "net"
+ "sync"
+ "testing"
+ "time"
+)
+
+var (
+ proxyCfg = &Config{
+ Key: "api-key",
+ Secret: "api-secret",
+ Region: "us-west-1",
+ Endpoint: "http://localhost:9325",
+ Timeout: 1,
+ }
+
+ proxy = &tcpProxy{
+ listen: "localhost:9325",
+ upstream: "localhost:9324",
+ accept: true,
+ }
+
+ proxyPipe = &jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ "queue": "test",
+ "lockReserved": 1,
+ "declare": map[string]interface{}{
+ "MessageRetentionPeriod": 86400,
+ },
+ }
+)
+
+type tcpProxy struct {
+ listen string
+ upstream string
+ mu sync.Mutex
+ accept bool
+ conn []net.Conn
+}
+
+func (p *tcpProxy) serve() {
+ l, err := net.Listen("tcp", p.listen)
+ if err != nil {
+ panic(err)
+ }
+
+ for {
+ in, err := l.Accept()
+ if err != nil {
+ panic(err)
+ }
+
+ if !p.accepting() {
+ in.Close()
+ }
+
+ up, err := net.Dial("tcp", p.upstream)
+ if err != nil {
+ panic(err)
+ }
+
+ go io.Copy(in, up)
+ go io.Copy(up, in)
+
+ p.mu.Lock()
+ p.conn = append(p.conn, in, up)
+ p.mu.Unlock()
+ }
+}
+
+// wait for specific number of connections
+func (p *tcpProxy) waitConn(count int) *tcpProxy {
+ p.mu.Lock()
+ p.accept = true
+ p.mu.Unlock()
+
+ for {
+ p.mu.Lock()
+ current := len(p.conn)
+ p.mu.Unlock()
+
+ if current >= count*2 {
+ break
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+
+ return p
+}
+
+func (p *tcpProxy) reset(accept bool) int {
+ p.mu.Lock()
+ p.accept = accept
+ defer p.mu.Unlock()
+
+ count := 0
+ for _, conn := range p.conn {
+ conn.Close()
+ count++
+ }
+
+ p.conn = nil
+ return count / 2
+}
+
+func (p *tcpProxy) accepting() bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.accept
+}
+
+func init() {
+ go proxy.serve()
+}
+
+func TestBroker_Durability_Base(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ // expect 2 connections
+ proxy.waitConn(1)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Durability_Consume(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(1)
+
+ jid, perr = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ time.Sleep(3 * time.Second)
+ proxy.waitConn(1)
+
+ jid, perr = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume2(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ st, serr := b.Stat(proxyPipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(1), st.Queue+st.Active)
+
+ proxy.reset(false)
+
+ _, serr = b.Stat(proxyPipe)
+ assert.Error(t, serr)
+
+ proxy.reset(true)
+
+ _, serr = b.Stat(proxyPipe)
+ assert.NoError(t, serr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume3(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ st, serr := b.Stat(proxyPipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(1), st.Queue+st.Active)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume4(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1)
+
+ _, err = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "kill",
+ Options: &jobs.Options{Timeout: 2},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ st, serr := b.Stat(proxyPipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(3), st.Queue+st.Active)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ if j.Payload == "kill" {
+ proxy.reset(true)
+ }
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 3 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_StopDead(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ b.Stop()
+}
diff --git a/plugins/jobs/broker/sqs/job.go b/plugins/jobs/broker/sqs/job.go
new file mode 100644
index 00000000..50e2c164
--- /dev/null
+++ b/plugins/jobs/broker/sqs/job.go
@@ -0,0 +1,80 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/jobs/v2"
+ "strconv"
+ "time"
+)
+
+var jobAttributes = []*string{
+ aws.String("rr-job"),
+ aws.String("rr-maxAttempts"),
+ aws.String("rr-delay"),
+ aws.String("rr-timeout"),
+ aws.String("rr-retryDelay"),
+}
+
+// pack job metadata into headers
+func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
+ return &sqs.SendMessageInput{
+ QueueUrl: url,
+ DelaySeconds: aws.Int64(int64(j.Options.Delay)),
+ MessageBody: aws.String(j.Payload),
+ MessageAttributes: map[string]*sqs.MessageAttributeValue{
+ "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)},
+ "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)},
+ "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())},
+ "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())},
+ "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
+ },
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
+ if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok {
+ return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount")
+ }
+ attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])
+
+ for _, attr := range jobAttributes {
+ if _, ok := msg.MessageAttributes[*attr]; !ok {
+ return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
+ }
+ }
+
+ j = &jobs.Job{
+ Job: *msg.MessageAttributes["rr-job"].StringValue,
+ Payload: *msg.Body,
+ Options: &jobs.Options{},
+ }
+
+ if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil {
+ j.Options.Delay = delay
+ }
+
+ if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil {
+ j.Options.Attempts = maxAttempts
+ }
+
+ if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil {
+ j.Options.Timeout = timeout
+ }
+
+ if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil {
+ j.Options.RetryDelay = retryDelay
+ }
+
+ return *msg.MessageId, attempt - 1, j, nil
+}
+
+func awsString(n int) *string {
+ return aws.String(strconv.Itoa(n))
+}
+
+func awsDuration(d time.Duration) *string {
+ return aws.String(strconv.Itoa(int(d.Seconds())))
+}
diff --git a/plugins/jobs/broker/sqs/job_test.go b/plugins/jobs/broker/sqs/job_test.go
new file mode 100644
index 00000000..a120af53
--- /dev/null
+++ b/plugins/jobs/broker/sqs/job_test.go
@@ -0,0 +1,19 @@
+package sqs
+
+import (
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_Unpack(t *testing.T) {
+ msg := &sqs.Message{
+ Body: aws.String("body"),
+ Attributes: map[string]*string{},
+ MessageAttributes: map[string]*sqs.MessageAttributeValue{},
+ }
+
+ _, _, _, err := unpack(msg)
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/sqs/queue.go b/plugins/jobs/broker/sqs/queue.go
new file mode 100644
index 00000000..8a92448e
--- /dev/null
+++ b/plugins/jobs/broker/sqs/queue.go
@@ -0,0 +1,266 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/jobs/v2"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type queue struct {
+ active int32
+ pipe *jobs.Pipeline
+ url *string
+ reserve time.Duration
+ lockReserved time.Duration
+
+ // queue events
+ lsn func(event int, ctx interface{})
+
+ // stop channel
+ wait chan interface{}
+
+ // active operations
+ muw sync.RWMutex
+ wg sync.WaitGroup
+
+ // exec handlers
+ execPool chan jobs.Handler
+ errHandler jobs.ErrorHandler
+}
+
+func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
+ if pipe.String("queue", "") == "" {
+ return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name())
+ }
+
+ return &queue{
+ pipe: pipe,
+ reserve: pipe.Duration("reserve", time.Second),
+ lockReserved: pipe.Duration("lockReserved", 300*time.Second),
+ lsn: lsn,
+ }, nil
+}
+
+// declareQueue declared queue
+func (q *queue) declareQueue(s *sqs.SQS) (*string, error) {
+ attr := make(map[string]*string)
+ for k, v := range q.pipe.Map("declare") {
+ if vs, ok := v.(string); ok {
+ attr[k] = aws.String(vs)
+ }
+
+ if vi, ok := v.(int); ok {
+ attr[k] = aws.String(strconv.Itoa(vi))
+ }
+
+ if vb, ok := v.(bool); ok {
+ if vb {
+ attr[k] = aws.String("true")
+ } else {
+ attr[k] = aws.String("false")
+ }
+ }
+ }
+
+ if len(attr) != 0 {
+ r, err := s.CreateQueue(&sqs.CreateQueueInput{
+ QueueName: aws.String(q.pipe.String("queue", "")),
+ Attributes: attr,
+ })
+
+ return r.QueueUrl, err
+ }
+
+ // no need to create (get existed)
+ r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))})
+ if err != nil {
+ return nil, err
+ }
+
+ return r.QueueUrl, nil
+}
+
+// serve consumers
+func (q *queue) serve(s *sqs.SQS, tout time.Duration) {
+ q.wait = make(chan interface{})
+ atomic.StoreInt32(&q.active, 1)
+
+ var errored bool
+ for {
+ messages, stop, err := q.consume(s)
+ if err != nil {
+ if errored {
+ // reoccurring error
+ time.Sleep(tout)
+ } else {
+ errored = true
+ q.report(err)
+ }
+
+ continue
+ }
+ errored = false
+
+ if stop {
+ return
+ }
+
+ for _, msg := range messages {
+ h := <-q.execPool
+ go func(h jobs.Handler, msg *sqs.Message) {
+ err := q.do(s, h, msg)
+ q.execPool <- h
+ q.wg.Done()
+ q.report(err)
+ }(h, msg)
+ }
+ }
+}
+
+// consume and allocate connection.
+func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) {
+ q.muw.Lock()
+ defer q.muw.Unlock()
+
+ select {
+ case <-q.wait:
+ return nil, true, nil
+ default:
+ r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{
+ QueueUrl: q.url,
+ MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))),
+ WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())),
+ VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())),
+ AttributeNames: []*string{aws.String("ApproximateReceiveCount")},
+ MessageAttributeNames: jobAttributes,
+ })
+ if err != nil {
+ return nil, false, err
+ }
+
+ q.wg.Add(len(r.Messages))
+
+ return r.Messages, false, nil
+ }
+}
+
+// do single message
+func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) {
+ id, attempt, j, err := unpack(msg)
+ if err != nil {
+ go q.deleteMessage(s, msg, err)
+ return err
+ }
+
+ // block the job based on known timeout
+ _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ QueueUrl: q.url,
+ ReceiptHandle: msg.ReceiptHandle,
+ VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())),
+ })
+ if err != nil {
+ go q.deleteMessage(s, msg, err)
+ return err
+ }
+
+ err = h(id, j)
+ if err == nil {
+ return q.deleteMessage(s, msg, nil)
+ }
+
+ q.errHandler(id, j, err)
+
+ if !j.Options.CanRetry(attempt) {
+ return q.deleteMessage(s, msg, err)
+ }
+
+ // retry after specified duration
+ _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ QueueUrl: q.url,
+ ReceiptHandle: msg.ReceiptHandle,
+ VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)),
+ })
+
+ return err
+}
+
+func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error {
+ _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle})
+ return drr
+}
+
+// stop the queue consuming
+func (q *queue) stop() {
+ if atomic.LoadInt32(&q.active) == 0 {
+ return
+ }
+
+ atomic.StoreInt32(&q.active, 0)
+
+ close(q.wait)
+ q.muw.Lock()
+ q.wg.Wait()
+ q.muw.Unlock()
+}
+
+// add job to the queue
+func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) {
+ r, err := s.SendMessage(pack(q.url, j))
+ if err != nil {
+ return "", err
+ }
+
+ return *r.MessageId, nil
+}
+
+// return queue stats
+func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) {
+ r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{
+ QueueUrl: q.url,
+ AttributeNames: []*string{
+ aws.String("ApproximateNumberOfMessages"),
+ aws.String("ApproximateNumberOfMessagesDelayed"),
+ aws.String("ApproximateNumberOfMessagesNotVisible"),
+ },
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")}
+
+ for a, v := range r.Attributes {
+ if a == "ApproximateNumberOfMessages" {
+ if v, err := strconv.Atoi(*v); err == nil {
+ stat.Queue = int64(v)
+ }
+ }
+
+ if a == "ApproximateNumberOfMessagesNotVisible" {
+ if v, err := strconv.Atoi(*v); err == nil {
+ stat.Active = int64(v)
+ }
+ }
+
+ if a == "ApproximateNumberOfMessagesDelayed" {
+ if v, err := strconv.Atoi(*v); err == nil {
+ stat.Delayed = int64(v)
+ }
+ }
+ }
+
+ return stat, nil
+}
+
+// throw handles service, server and pool events.
+func (q *queue) report(err error) {
+ if err != nil {
+ q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
+ }
+}
diff --git a/plugins/jobs/broker/sqs/stat_test.go b/plugins/jobs/broker/sqs/stat_test.go
new file mode 100644
index 00000000..5031571b
--- /dev/null
+++ b/plugins/jobs/broker/sqs/stat_test.go
@@ -0,0 +1,60 @@
+package sqs
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestBroker_Stat(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ // unable to use approximated stats
+ _, err = b.Stat(pipe)
+ assert.NoError(t, err)
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ _, err := b.Stat(pipe)
+ assert.NoError(t, err)
+
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+ _, err = b.Stat(pipe)
+ assert.NoError(t, err)
+}