diff options
author | Valery Piashchynski <[email protected]> | 2021-06-22 11:44:22 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-22 11:44:22 +0300 |
commit | 1a2a1f4735e40675abf6cd9767c99374359ec2bb (patch) | |
tree | 5abedf7306b50b02ba3892c0bc562307a62eb332 /plugins/jobs/oooold/broker/sqs | |
parent | 260d69c21fba6d763d05dc5693689ddf7ce7bfe2 (diff) |
- Remove all old code, reformat, fix linters, return GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/oooold/broker/sqs')
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/broker.go | 189 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/broker_test.go | 275 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/config.go | 82 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/config_test.go | 48 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/consume_test.go | 370 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/durability_test.go | 588 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/job.go | 80 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/job_test.go | 19 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/queue.go | 266 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/sqs/stat_test.go | 60 |
10 files changed, 0 insertions, 1977 deletions
diff --git a/plugins/jobs/oooold/broker/sqs/broker.go b/plugins/jobs/oooold/broker/sqs/broker.go deleted file mode 100644 index 8cc62b6b..00000000 --- a/plugins/jobs/oooold/broker/sqs/broker.go +++ /dev/null @@ -1,189 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker represents SQS broker. -type Broker struct { - cfg *Config - sqs *sqs.SQS - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures SQS broker. -func (b *Broker) Init(cfg *Config) (ok bool, err error) { - b.cfg = cfg - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - q, err := newQueue(pipe, b.throw) - if err != nil { - return err - } - - b.queues[pipe] = q - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() (err error) { - b.mu.Lock() - - b.sqs, err = b.cfg.SQS() - if err != nil { - return err - } - - for _, q := range b.queues { - q.url, err = q.declareQueue(b.sqs) - if err != nil { - return err - } - } - - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve(b.sqs, b.cfg.TimeoutDuration()) - } - } - - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - for _, q := range b.queues { - q.stop() - } - - b.wait <- nil - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.sqs != nil && q.execPool != nil { - go q.serve(b.sqs, b.cfg.TimeoutDuration()) - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - if j.Options.Delay > 900 || j.Options.RetryDelay > 900 { - return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name()) - } - - return q.send(b.sqs, j) -} - -// Stat must fetch statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - return q.stat(b.sqs) -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/sqs/broker_test.go b/plugins/jobs/oooold/broker/sqs/broker_test.go deleted file mode 100644 index c87b302d..00000000 --- a/plugins/jobs/oooold/broker/sqs/broker_test.go +++ /dev/null @@ -1,275 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - "declare": map[string]interface{}{ - "MessageRetentionPeriod": 86400, - }, - } - - cfg = &Config{ - Key: "api-key", - Secret: "api-secret", - Region: "us-west-1", - Endpoint: "http://localhost:9324", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_RegisterInvalid(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.Error(t, b.Register(&jobs.Pipeline{ - "broker": "sqs", - "name": "default", - })) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - b.Consume(pipe, exec, errf) - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) { - pipe := &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "invalid", - "declare": map[string]interface{}{ - "VisibilityTimeout": "invalid", - }, - } - - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - b.Consume(pipe, exec, errf) - - assert.Error(t, b.Serve()) -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/sqs/config.go b/plugins/jobs/oooold/broker/sqs/config.go deleted file mode 100644 index d0c2f2b2..00000000 --- a/plugins/jobs/oooold/broker/sqs/config.go +++ /dev/null @@ -1,82 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/roadrunner/service" - "time" -) - -// Config defines sqs broker configuration. -type Config struct { - // Region defined SQS region, not required when endpoint is not empty. - Region string - - // Region defined AWS API key, not required when endpoint is not empty. - Key string - - // Region defined AWS API secret, not required when endpoint is not empty. - Secret string - - // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development. - Endpoint string - - // Timeout to allocate the connection. Default 10 seconds. - Timeout int -} - -// Hydrate config values. -func (c *Config) Hydrate(cfg service.Config) error { - if err := cfg.Unmarshal(c); err != nil { - return err - } - - if c.Region == "" { - return fmt.Errorf("SQS region is missing") - } - - if c.Key == "" { - return fmt.Errorf("SQS key is missing") - } - - if c.Secret == "" { - return fmt.Errorf("SQS secret is missing") - } - - return nil -} - -// TimeoutDuration returns number of seconds allowed to allocate the connection. -func (c *Config) TimeoutDuration() time.Duration { - timeout := c.Timeout - if timeout == 0 { - timeout = 10 - } - - return time.Duration(timeout) * time.Second -} - -// Session returns new AWS session. -func (c *Config) Session() (*session.Session, error) { - return session.NewSession(&aws.Config{ - Region: aws.String(c.Region), - Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""), - }) -} - -// SQS returns new SQS instance or error. -func (c *Config) SQS() (*sqs.SQS, error) { - sess, err := c.Session() - if err != nil { - return nil, err - } - - if c.Endpoint == "" { - return sqs.New(sess), nil - } - - return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil -} diff --git a/plugins/jobs/oooold/broker/sqs/config_test.go b/plugins/jobs/oooold/broker/sqs/config_test.go deleted file mode 100644 index b36b3c6f..00000000 --- a/plugins/jobs/oooold/broker/sqs/config_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package sqs - -import ( - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_Config_Hydrate_Error(t *testing.T) { - cfg := &mockCfg{`{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error3(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1"}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error4(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error5(t *testing.T) { - cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} diff --git a/plugins/jobs/oooold/broker/sqs/consume_test.go b/plugins/jobs/oooold/broker/sqs/consume_test.go deleted file mode 100644 index 434fc6ea..00000000 --- a/plugins/jobs/oooold/broker/sqs/consume_test.go +++ /dev/null @@ -1,370 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) { - pipe := &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - } - - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_PushTooBigDelay(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{ - Delay: 901, - }, - }) - - assert.Error(t, perr) -} - -func TestBroker_Consume_PushTooBigDelay2(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{ - RetryDelay: 901, - }, - }) - - assert.Error(t, perr) -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed > time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled - b.Stop() -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/sqs/durability_test.go b/plugins/jobs/oooold/broker/sqs/durability_test.go deleted file mode 100644 index 58ddf4b9..00000000 --- a/plugins/jobs/oooold/broker/sqs/durability_test.go +++ /dev/null @@ -1,588 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "io" - "net" - "sync" - "testing" - "time" -) - -var ( - proxyCfg = &Config{ - Key: "api-key", - Secret: "api-secret", - Region: "us-west-1", - Endpoint: "http://localhost:9325", - Timeout: 1, - } - - proxy = &tcpProxy{ - listen: "localhost:9325", - upstream: "localhost:9324", - accept: true, - } - - proxyPipe = &jobs.Pipeline{ - "broker": "sqs", - "name": "default", - "queue": "test", - "lockReserved": 1, - "declare": map[string]interface{}{ - "MessageRetentionPeriod": 86400, - }, - } -) - -type tcpProxy struct { - listen string - upstream string - mu sync.Mutex - accept bool - conn []net.Conn -} - -func (p *tcpProxy) serve() { - l, err := net.Listen("tcp", p.listen) - if err != nil { - panic(err) - } - - for { - in, err := l.Accept() - if err != nil { - panic(err) - } - - if !p.accepting() { - in.Close() - } - - up, err := net.Dial("tcp", p.upstream) - if err != nil { - panic(err) - } - - go io.Copy(in, up) - go io.Copy(up, in) - - p.mu.Lock() - p.conn = append(p.conn, in, up) - p.mu.Unlock() - } -} - -// wait for specific number of connections -func (p *tcpProxy) waitConn(count int) *tcpProxy { - p.mu.Lock() - p.accept = true - p.mu.Unlock() - - for { - p.mu.Lock() - current := len(p.conn) - p.mu.Unlock() - - if current >= count*2 { - break - } - - time.Sleep(time.Millisecond) - } - - return p -} - -func (p *tcpProxy) reset(accept bool) int { - p.mu.Lock() - p.accept = accept - defer p.mu.Unlock() - - count := 0 - for _, conn := range p.conn { - conn.Close() - count++ - } - - p.conn = nil - return count / 2 -} - -func (p *tcpProxy) accepting() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.accept -} - -func init() { - go proxy.serve() -} - -func TestBroker_Durability_Base(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - // expect 2 connections - proxy.waitConn(1) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Durability_Consume(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(1) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - time.Sleep(3 * time.Second) - proxy.waitConn(1) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume2(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1).reset(false) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.Error(t, perr) - - // restore - proxy.waitConn(2) - - jid, perr = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - proxy.reset(false) - - _, serr = b.Stat(proxyPipe) - assert.Error(t, serr) - - proxy.reset(true) - - _, serr = b.Stat(proxyPipe) - assert.NoError(t, serr) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume3(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1) - - jid, perr := b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(1), st.Queue+st.Active) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 1 { - break - } - } -} - -func TestBroker_Durability_Consume4(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - proxy.waitConn(1) - - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "kill", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(proxyPipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Timeout: 2}, - }) - if err != nil { - t.Fatal(err) - } - - st, serr := b.Stat(proxyPipe) - assert.NoError(t, serr) - assert.Equal(t, int64(3), st.Queue+st.Active) - - mu := sync.Mutex{} - done := make(map[string]bool) - exec <- func(id string, j *jobs.Job) error { - mu.Lock() - defer mu.Unlock() - done[id] = true - - if j.Payload == "kill" { - proxy.reset(true) - } - - return nil - } - - for { - mu.Lock() - num := len(done) - mu.Unlock() - - if num >= 3 { - break - } - } -} - -func TestBroker_Durability_StopDead(t *testing.T) { - defer proxy.reset(true) - - b := &Broker{} - _, err := b.Init(proxyCfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(proxyPipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - - <-ready - - proxy.waitConn(1).reset(false) - - b.Stop() -} diff --git a/plugins/jobs/oooold/broker/sqs/job.go b/plugins/jobs/oooold/broker/sqs/job.go deleted file mode 100644 index 50e2c164..00000000 --- a/plugins/jobs/oooold/broker/sqs/job.go +++ /dev/null @@ -1,80 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "strconv" - "time" -) - -var jobAttributes = []*string{ - aws.String("rr-job"), - aws.String("rr-maxAttempts"), - aws.String("rr-delay"), - aws.String("rr-timeout"), - aws.String("rr-retryDelay"), -} - -// pack job metadata into headers -func pack(url *string, j *jobs.Job) *sqs.SendMessageInput { - return &sqs.SendMessageInput{ - QueueUrl: url, - DelaySeconds: aws.Int64(int64(j.Options.Delay)), - MessageBody: aws.String(j.Payload), - MessageAttributes: map[string]*sqs.MessageAttributeValue{ - "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)}, - "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)}, - "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())}, - "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())}, - "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())}, - }, - } -} - -// unpack restores jobs.Options -func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) { - if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok { - return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount") - } - attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"]) - - for _, attr := range jobAttributes { - if _, ok := msg.MessageAttributes[*attr]; !ok { - return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr) - } - } - - j = &jobs.Job{ - Job: *msg.MessageAttributes["rr-job"].StringValue, - Payload: *msg.Body, - Options: &jobs.Options{}, - } - - if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil { - j.Options.Delay = delay - } - - if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil { - j.Options.Attempts = maxAttempts - } - - if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil { - j.Options.Timeout = timeout - } - - if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil { - j.Options.RetryDelay = retryDelay - } - - return *msg.MessageId, attempt - 1, j, nil -} - -func awsString(n int) *string { - return aws.String(strconv.Itoa(n)) -} - -func awsDuration(d time.Duration) *string { - return aws.String(strconv.Itoa(int(d.Seconds()))) -} diff --git a/plugins/jobs/oooold/broker/sqs/job_test.go b/plugins/jobs/oooold/broker/sqs/job_test.go deleted file mode 100644 index a120af53..00000000 --- a/plugins/jobs/oooold/broker/sqs/job_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package sqs - -import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_Unpack(t *testing.T) { - msg := &sqs.Message{ - Body: aws.String("body"), - Attributes: map[string]*string{}, - MessageAttributes: map[string]*sqs.MessageAttributeValue{}, - } - - _, _, _, err := unpack(msg) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/sqs/queue.go b/plugins/jobs/oooold/broker/sqs/queue.go deleted file mode 100644 index 8a92448e..00000000 --- a/plugins/jobs/oooold/broker/sqs/queue.go +++ /dev/null @@ -1,266 +0,0 @@ -package sqs - -import ( - "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/spiral/jobs/v2" - "strconv" - "sync" - "sync/atomic" - "time" -) - -type queue struct { - active int32 - pipe *jobs.Pipeline - url *string - reserve time.Duration - lockReserved time.Duration - - // queue events - lsn func(event int, ctx interface{}) - - // stop channel - wait chan interface{} - - // active operations - muw sync.RWMutex - wg sync.WaitGroup - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { - if pipe.String("queue", "") == "" { - return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name()) - } - - return &queue{ - pipe: pipe, - reserve: pipe.Duration("reserve", time.Second), - lockReserved: pipe.Duration("lockReserved", 300*time.Second), - lsn: lsn, - }, nil -} - -// declareQueue declared queue -func (q *queue) declareQueue(s *sqs.SQS) (*string, error) { - attr := make(map[string]*string) - for k, v := range q.pipe.Map("declare") { - if vs, ok := v.(string); ok { - attr[k] = aws.String(vs) - } - - if vi, ok := v.(int); ok { - attr[k] = aws.String(strconv.Itoa(vi)) - } - - if vb, ok := v.(bool); ok { - if vb { - attr[k] = aws.String("true") - } else { - attr[k] = aws.String("false") - } - } - } - - if len(attr) != 0 { - r, err := s.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(q.pipe.String("queue", "")), - Attributes: attr, - }) - - return r.QueueUrl, err - } - - // no need to create (get existed) - r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))}) - if err != nil { - return nil, err - } - - return r.QueueUrl, nil -} - -// serve consumers -func (q *queue) serve(s *sqs.SQS, tout time.Duration) { - q.wait = make(chan interface{}) - atomic.StoreInt32(&q.active, 1) - - var errored bool - for { - messages, stop, err := q.consume(s) - if err != nil { - if errored { - // reoccurring error - time.Sleep(tout) - } else { - errored = true - q.report(err) - } - - continue - } - errored = false - - if stop { - return - } - - for _, msg := range messages { - h := <-q.execPool - go func(h jobs.Handler, msg *sqs.Message) { - err := q.do(s, h, msg) - q.execPool <- h - q.wg.Done() - q.report(err) - }(h, msg) - } - } -} - -// consume and allocate connection. -func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) { - q.muw.Lock() - defer q.muw.Unlock() - - select { - case <-q.wait: - return nil, true, nil - default: - r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{ - QueueUrl: q.url, - MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))), - WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())), - VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())), - AttributeNames: []*string{aws.String("ApproximateReceiveCount")}, - MessageAttributeNames: jobAttributes, - }) - if err != nil { - return nil, false, err - } - - q.wg.Add(len(r.Messages)) - - return r.Messages, false, nil - } -} - -// do single message -func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) { - id, attempt, j, err := unpack(msg) - if err != nil { - go q.deleteMessage(s, msg, err) - return err - } - - // block the job based on known timeout - _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - QueueUrl: q.url, - ReceiptHandle: msg.ReceiptHandle, - VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())), - }) - if err != nil { - go q.deleteMessage(s, msg, err) - return err - } - - err = h(id, j) - if err == nil { - return q.deleteMessage(s, msg, nil) - } - - q.errHandler(id, j, err) - - if !j.Options.CanRetry(attempt) { - return q.deleteMessage(s, msg, err) - } - - // retry after specified duration - _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ - QueueUrl: q.url, - ReceiptHandle: msg.ReceiptHandle, - VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)), - }) - - return err -} - -func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error { - _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle}) - return drr -} - -// stop the queue consuming -func (q *queue) stop() { - if atomic.LoadInt32(&q.active) == 0 { - return - } - - atomic.StoreInt32(&q.active, 0) - - close(q.wait) - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() -} - -// add job to the queue -func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) { - r, err := s.SendMessage(pack(q.url, j)) - if err != nil { - return "", err - } - - return *r.MessageId, nil -} - -// return queue stats -func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) { - r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{ - QueueUrl: q.url, - AttributeNames: []*string{ - aws.String("ApproximateNumberOfMessages"), - aws.String("ApproximateNumberOfMessagesDelayed"), - aws.String("ApproximateNumberOfMessagesNotVisible"), - }, - }) - - if err != nil { - return nil, err - } - - stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")} - - for a, v := range r.Attributes { - if a == "ApproximateNumberOfMessages" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Queue = int64(v) - } - } - - if a == "ApproximateNumberOfMessagesNotVisible" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Active = int64(v) - } - } - - if a == "ApproximateNumberOfMessagesDelayed" { - if v, err := strconv.Atoi(*v); err == nil { - stat.Delayed = int64(v) - } - } - } - - return stat, nil -} - -// throw handles service, server and pool events. -func (q *queue) report(err error) { - if err != nil { - q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) - } -} diff --git a/plugins/jobs/oooold/broker/sqs/stat_test.go b/plugins/jobs/oooold/broker/sqs/stat_test.go deleted file mode 100644 index 5031571b..00000000 --- a/plugins/jobs/oooold/broker/sqs/stat_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package sqs - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init(cfg) - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - // unable to use approximated stats - _, err = b.Stat(pipe) - assert.NoError(t, err) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - _, err := b.Stat(pipe) - assert.NoError(t, err) - - close(waitJob) - return nil - } - - <-waitJob - _, err = b.Stat(pipe) - assert.NoError(t, err) -} |