summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/sqs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-22 11:44:22 +0300
committerValery Piashchynski <[email protected]>2021-06-22 11:44:22 +0300
commit1a2a1f4735e40675abf6cd9767c99374359ec2bb (patch)
tree5abedf7306b50b02ba3892c0bc562307a62eb332 /plugins/jobs/oooold/broker/sqs
parent260d69c21fba6d763d05dc5693689ddf7ce7bfe2 (diff)
- Remove all old code, reformat, fix linters, return GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker/sqs')
-rw-r--r--plugins/jobs/oooold/broker/sqs/broker.go189
-rw-r--r--plugins/jobs/oooold/broker/sqs/broker_test.go275
-rw-r--r--plugins/jobs/oooold/broker/sqs/config.go82
-rw-r--r--plugins/jobs/oooold/broker/sqs/config_test.go48
-rw-r--r--plugins/jobs/oooold/broker/sqs/consume_test.go370
-rw-r--r--plugins/jobs/oooold/broker/sqs/durability_test.go588
-rw-r--r--plugins/jobs/oooold/broker/sqs/job.go80
-rw-r--r--plugins/jobs/oooold/broker/sqs/job_test.go19
-rw-r--r--plugins/jobs/oooold/broker/sqs/queue.go266
-rw-r--r--plugins/jobs/oooold/broker/sqs/stat_test.go60
10 files changed, 0 insertions, 1977 deletions
diff --git a/plugins/jobs/oooold/broker/sqs/broker.go b/plugins/jobs/oooold/broker/sqs/broker.go
deleted file mode 100644
index 8cc62b6b..00000000
--- a/plugins/jobs/oooold/broker/sqs/broker.go
+++ /dev/null
@@ -1,189 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker represents SQS broker.
-type Broker struct {
- cfg *Config
- sqs *sqs.SQS
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures SQS broker.
-func (b *Broker) Init(cfg *Config) (ok bool, err error) {
- b.cfg = cfg
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- q, err := newQueue(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.queues[pipe] = q
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- b.sqs, err = b.cfg.SQS()
- if err != nil {
- return err
- }
-
- for _, q := range b.queues {
- q.url, err = q.declareQueue(b.sqs)
- if err != nil {
- return err
- }
- }
-
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve(b.sqs, b.cfg.TimeoutDuration())
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, q := range b.queues {
- q.stop()
- }
-
- b.wait <- nil
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.sqs != nil && q.execPool != nil {
- go q.serve(b.sqs, b.cfg.TimeoutDuration())
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- if j.Options.Delay > 900 || j.Options.RetryDelay > 900 {
- return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name())
- }
-
- return q.send(b.sqs, j)
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- return q.stat(b.sqs)
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/sqs/broker_test.go b/plugins/jobs/oooold/broker/sqs/broker_test.go
deleted file mode 100644
index c87b302d..00000000
--- a/plugins/jobs/oooold/broker/sqs/broker_test.go
+++ /dev/null
@@ -1,275 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- "declare": map[string]interface{}{
- "MessageRetentionPeriod": 86400,
- },
- }
-
- cfg = &Config{
- Key: "api-key",
- Secret: "api-secret",
- Region: "us-west-1",
- Endpoint: "http://localhost:9324",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_RegisterInvalid(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- }))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- b.Consume(pipe, exec, errf)
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) {
- pipe := &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "invalid",
- "declare": map[string]interface{}{
- "VisibilityTimeout": "invalid",
- },
- }
-
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- b.Consume(pipe, exec, errf)
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/config.go b/plugins/jobs/oooold/broker/sqs/config.go
deleted file mode 100644
index d0c2f2b2..00000000
--- a/plugins/jobs/oooold/broker/sqs/config.go
+++ /dev/null
@@ -1,82 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/credentials"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/roadrunner/service"
- "time"
-)
-
-// Config defines sqs broker configuration.
-type Config struct {
- // Region defined SQS region, not required when endpoint is not empty.
- Region string
-
- // Region defined AWS API key, not required when endpoint is not empty.
- Key string
-
- // Region defined AWS API secret, not required when endpoint is not empty.
- Secret string
-
- // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development.
- Endpoint string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Region == "" {
- return fmt.Errorf("SQS region is missing")
- }
-
- if c.Key == "" {
- return fmt.Errorf("SQS key is missing")
- }
-
- if c.Secret == "" {
- return fmt.Errorf("SQS secret is missing")
- }
-
- return nil
-}
-
-// TimeoutDuration returns number of seconds allowed to allocate the connection.
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
-
-// Session returns new AWS session.
-func (c *Config) Session() (*session.Session, error) {
- return session.NewSession(&aws.Config{
- Region: aws.String(c.Region),
- Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""),
- })
-}
-
-// SQS returns new SQS instance or error.
-func (c *Config) SQS() (*sqs.SQS, error) {
- sess, err := c.Session()
- if err != nil {
- return nil, err
- }
-
- if c.Endpoint == "" {
- return sqs.New(sess), nil
- }
-
- return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil
-}
diff --git a/plugins/jobs/oooold/broker/sqs/config_test.go b/plugins/jobs/oooold/broker/sqs/config_test.go
deleted file mode 100644
index b36b3c6f..00000000
--- a/plugins/jobs/oooold/broker/sqs/config_test.go
+++ /dev/null
@@ -1,48 +0,0 @@
-package sqs
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error3(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1"}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error4(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error5(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
diff --git a/plugins/jobs/oooold/broker/sqs/consume_test.go b/plugins/jobs/oooold/broker/sqs/consume_test.go
deleted file mode 100644
index 434fc6ea..00000000
--- a/plugins/jobs/oooold/broker/sqs/consume_test.go
+++ /dev/null
@@ -1,370 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) {
- pipe := &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- }
-
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_PushTooBigDelay(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{
- Delay: 901,
- },
- })
-
- assert.Error(t, perr)
-}
-
-func TestBroker_Consume_PushTooBigDelay2(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{
- RetryDelay: 901,
- },
- })
-
- assert.Error(t, perr)
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed > time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
- b.Stop()
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/durability_test.go b/plugins/jobs/oooold/broker/sqs/durability_test.go
deleted file mode 100644
index 58ddf4b9..00000000
--- a/plugins/jobs/oooold/broker/sqs/durability_test.go
+++ /dev/null
@@ -1,588 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Key: "api-key",
- Secret: "api-secret",
- Region: "us-west-1",
- Endpoint: "http://localhost:9325",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:9325",
- upstream: "localhost:9324",
- accept: true,
- }
-
- proxyPipe = &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- "lockReserved": 1,
- "declare": map[string]interface{}{
- "MessageRetentionPeriod": 86400,
- },
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- // expect 2 connections
- proxy.waitConn(1)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(1)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- proxy.reset(false)
-
- _, serr = b.Stat(proxyPipe)
- assert.Error(t, serr)
-
- proxy.reset(true)
-
- _, serr = b.Stat(proxyPipe)
- assert.NoError(t, serr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1)
-
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(3), st.Queue+st.Active)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- if j.Payload == "kill" {
- proxy.reset(true)
- }
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 3 {
- break
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/sqs/job.go b/plugins/jobs/oooold/broker/sqs/job.go
deleted file mode 100644
index 50e2c164..00000000
--- a/plugins/jobs/oooold/broker/sqs/job.go
+++ /dev/null
@@ -1,80 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "strconv"
- "time"
-)
-
-var jobAttributes = []*string{
- aws.String("rr-job"),
- aws.String("rr-maxAttempts"),
- aws.String("rr-delay"),
- aws.String("rr-timeout"),
- aws.String("rr-retryDelay"),
-}
-
-// pack job metadata into headers
-func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
- return &sqs.SendMessageInput{
- QueueUrl: url,
- DelaySeconds: aws.Int64(int64(j.Options.Delay)),
- MessageBody: aws.String(j.Payload),
- MessageAttributes: map[string]*sqs.MessageAttributeValue{
- "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)},
- "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)},
- "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())},
- "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())},
- "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
- },
- }
-}
-
-// unpack restores jobs.Options
-func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
- if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok {
- return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount")
- }
- attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])
-
- for _, attr := range jobAttributes {
- if _, ok := msg.MessageAttributes[*attr]; !ok {
- return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
- }
- }
-
- j = &jobs.Job{
- Job: *msg.MessageAttributes["rr-job"].StringValue,
- Payload: *msg.Body,
- Options: &jobs.Options{},
- }
-
- if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil {
- j.Options.Delay = delay
- }
-
- if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil {
- j.Options.Attempts = maxAttempts
- }
-
- if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil {
- j.Options.Timeout = timeout
- }
-
- if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil {
- j.Options.RetryDelay = retryDelay
- }
-
- return *msg.MessageId, attempt - 1, j, nil
-}
-
-func awsString(n int) *string {
- return aws.String(strconv.Itoa(n))
-}
-
-func awsDuration(d time.Duration) *string {
- return aws.String(strconv.Itoa(int(d.Seconds())))
-}
diff --git a/plugins/jobs/oooold/broker/sqs/job_test.go b/plugins/jobs/oooold/broker/sqs/job_test.go
deleted file mode 100644
index a120af53..00000000
--- a/plugins/jobs/oooold/broker/sqs/job_test.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package sqs
-
-import (
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func Test_Unpack(t *testing.T) {
- msg := &sqs.Message{
- Body: aws.String("body"),
- Attributes: map[string]*string{},
- MessageAttributes: map[string]*sqs.MessageAttributeValue{},
- }
-
- _, _, _, err := unpack(msg)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/queue.go b/plugins/jobs/oooold/broker/sqs/queue.go
deleted file mode 100644
index 8a92448e..00000000
--- a/plugins/jobs/oooold/broker/sqs/queue.go
+++ /dev/null
@@ -1,266 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type queue struct {
- active int32
- pipe *jobs.Pipeline
- url *string
- reserve time.Duration
- lockReserved time.Duration
-
- // queue events
- lsn func(event int, ctx interface{})
-
- // stop channel
- wait chan interface{}
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
- if pipe.String("queue", "") == "" {
- return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name())
- }
-
- return &queue{
- pipe: pipe,
- reserve: pipe.Duration("reserve", time.Second),
- lockReserved: pipe.Duration("lockReserved", 300*time.Second),
- lsn: lsn,
- }, nil
-}
-
-// declareQueue declared queue
-func (q *queue) declareQueue(s *sqs.SQS) (*string, error) {
- attr := make(map[string]*string)
- for k, v := range q.pipe.Map("declare") {
- if vs, ok := v.(string); ok {
- attr[k] = aws.String(vs)
- }
-
- if vi, ok := v.(int); ok {
- attr[k] = aws.String(strconv.Itoa(vi))
- }
-
- if vb, ok := v.(bool); ok {
- if vb {
- attr[k] = aws.String("true")
- } else {
- attr[k] = aws.String("false")
- }
- }
- }
-
- if len(attr) != 0 {
- r, err := s.CreateQueue(&sqs.CreateQueueInput{
- QueueName: aws.String(q.pipe.String("queue", "")),
- Attributes: attr,
- })
-
- return r.QueueUrl, err
- }
-
- // no need to create (get existed)
- r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))})
- if err != nil {
- return nil, err
- }
-
- return r.QueueUrl, nil
-}
-
-// serve consumers
-func (q *queue) serve(s *sqs.SQS, tout time.Duration) {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.active, 1)
-
- var errored bool
- for {
- messages, stop, err := q.consume(s)
- if err != nil {
- if errored {
- // reoccurring error
- time.Sleep(tout)
- } else {
- errored = true
- q.report(err)
- }
-
- continue
- }
- errored = false
-
- if stop {
- return
- }
-
- for _, msg := range messages {
- h := <-q.execPool
- go func(h jobs.Handler, msg *sqs.Message) {
- err := q.do(s, h, msg)
- q.execPool <- h
- q.wg.Done()
- q.report(err)
- }(h, msg)
- }
- }
-}
-
-// consume and allocate connection.
-func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil, true, nil
- default:
- r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{
- QueueUrl: q.url,
- MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))),
- WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())),
- VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())),
- AttributeNames: []*string{aws.String("ApproximateReceiveCount")},
- MessageAttributeNames: jobAttributes,
- })
- if err != nil {
- return nil, false, err
- }
-
- q.wg.Add(len(r.Messages))
-
- return r.Messages, false, nil
- }
-}
-
-// do single message
-func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) {
- id, attempt, j, err := unpack(msg)
- if err != nil {
- go q.deleteMessage(s, msg, err)
- return err
- }
-
- // block the job based on known timeout
- _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- QueueUrl: q.url,
- ReceiptHandle: msg.ReceiptHandle,
- VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())),
- })
- if err != nil {
- go q.deleteMessage(s, msg, err)
- return err
- }
-
- err = h(id, j)
- if err == nil {
- return q.deleteMessage(s, msg, nil)
- }
-
- q.errHandler(id, j, err)
-
- if !j.Options.CanRetry(attempt) {
- return q.deleteMessage(s, msg, err)
- }
-
- // retry after specified duration
- _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- QueueUrl: q.url,
- ReceiptHandle: msg.ReceiptHandle,
- VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)),
- })
-
- return err
-}
-
-func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error {
- _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle})
- return drr
-}
-
-// stop the queue consuming
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&q.active, 0)
-
- close(q.wait)
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-}
-
-// add job to the queue
-func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) {
- r, err := s.SendMessage(pack(q.url, j))
- if err != nil {
- return "", err
- }
-
- return *r.MessageId, nil
-}
-
-// return queue stats
-func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) {
- r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{
- QueueUrl: q.url,
- AttributeNames: []*string{
- aws.String("ApproximateNumberOfMessages"),
- aws.String("ApproximateNumberOfMessagesDelayed"),
- aws.String("ApproximateNumberOfMessagesNotVisible"),
- },
- })
-
- if err != nil {
- return nil, err
- }
-
- stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")}
-
- for a, v := range r.Attributes {
- if a == "ApproximateNumberOfMessages" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Queue = int64(v)
- }
- }
-
- if a == "ApproximateNumberOfMessagesNotVisible" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Active = int64(v)
- }
- }
-
- if a == "ApproximateNumberOfMessagesDelayed" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Delayed = int64(v)
- }
- }
- }
-
- return stat, nil
-}
-
-// throw handles service, server and pool events.
-func (q *queue) report(err error) {
- if err != nil {
- q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/sqs/stat_test.go b/plugins/jobs/oooold/broker/sqs/stat_test.go
deleted file mode 100644
index 5031571b..00000000
--- a/plugins/jobs/oooold/broker/sqs/stat_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- // unable to use approximated stats
- _, err = b.Stat(pipe)
- assert.NoError(t, err)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- _, err := b.Stat(pipe)
- assert.NoError(t, err)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
- _, err = b.Stat(pipe)
- assert.NoError(t, err)
-}