summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/jobs/broker.go47
-rw-r--r--plugins/jobs/broker/amqp/broker.go216
-rw-r--r--plugins/jobs/broker/amqp/broker_test.go419
-rw-r--r--plugins/jobs/broker/amqp/config.go39
-rw-r--r--plugins/jobs/broker/amqp/config_test.go27
-rw-r--r--plugins/jobs/broker/amqp/conn.go232
-rw-r--r--plugins/jobs/broker/amqp/consume_test.go258
-rw-r--r--plugins/jobs/broker/amqp/durability_test.go728
-rw-r--r--plugins/jobs/broker/amqp/job.go56
-rw-r--r--plugins/jobs/broker/amqp/job_test.go29
-rw-r--r--plugins/jobs/broker/amqp/queue.go302
-rw-r--r--plugins/jobs/broker/amqp/stat_test.go63
-rw-r--r--plugins/jobs/broker/beanstalk/broker.go185
-rw-r--r--plugins/jobs/broker/beanstalk/broker_test.go276
-rw-r--r--plugins/jobs/broker/beanstalk/config.go50
-rw-r--r--plugins/jobs/broker/beanstalk/config_test.go47
-rw-r--r--plugins/jobs/broker/beanstalk/conn.go180
-rw-r--r--plugins/jobs/broker/beanstalk/constants.go6
-rw-r--r--plugins/jobs/broker/beanstalk/consume_test.go242
-rw-r--r--plugins/jobs/broker/beanstalk/durability_test.go575
-rw-r--r--plugins/jobs/broker/beanstalk/job.go24
-rw-r--r--plugins/jobs/broker/beanstalk/sock.bean0
-rw-r--r--plugins/jobs/broker/beanstalk/stat_test.go66
-rw-r--r--plugins/jobs/broker/beanstalk/tube.go250
-rw-r--r--plugins/jobs/broker/beanstalk/tube_test.go18
-rw-r--r--plugins/jobs/broker/ephemeral/broker.go174
-rw-r--r--plugins/jobs/broker/ephemeral/broker_test.go221
-rw-r--r--plugins/jobs/broker/ephemeral/consume_test.go253
-rw-r--r--plugins/jobs/broker/ephemeral/queue.go161
-rw-r--r--plugins/jobs/broker/ephemeral/stat_test.go64
-rw-r--r--plugins/jobs/broker/sqs/broker.go189
-rw-r--r--plugins/jobs/broker/sqs/broker_test.go275
-rw-r--r--plugins/jobs/broker/sqs/config.go82
-rw-r--r--plugins/jobs/broker/sqs/config_test.go48
-rw-r--r--plugins/jobs/broker/sqs/consume_test.go370
-rw-r--r--plugins/jobs/broker/sqs/durability_test.go588
-rw-r--r--plugins/jobs/broker/sqs/job.go80
-rw-r--r--plugins/jobs/broker/sqs/job_test.go19
-rw-r--r--plugins/jobs/broker/sqs/queue.go266
-rw-r--r--plugins/jobs/broker/sqs/stat_test.go60
-rw-r--r--plugins/jobs/broker_test.go314
-rw-r--r--plugins/jobs/config.go91
-rw-r--r--plugins/jobs/config_test.go158
-rw-r--r--plugins/jobs/dispatcher.go47
-rw-r--r--plugins/jobs/dispatcher_test.go53
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio1
-rw-r--r--plugins/jobs/event.go96
-rw-r--r--plugins/jobs/event_test.go52
-rw-r--r--plugins/jobs/job.go38
-rw-r--r--plugins/jobs/job_options.go70
-rw-r--r--plugins/jobs/job_options_test.go109
-rw-r--r--plugins/jobs/job_test.go18
-rw-r--r--plugins/jobs/pipeline.go169
-rw-r--r--plugins/jobs/pipeline_test.go89
-rw-r--r--plugins/jobs/rpc.go151
-rw-r--r--plugins/jobs/rpc_test.go657
-rw-r--r--plugins/jobs/service.go327
-rw-r--r--plugins/jobs/service_test.go458
-rw-r--r--plugins/jobs/tests/.rr.yaml63
-rw-r--r--plugins/jobs/tests/Jobs/Amqp/BrokerTest.php20
-rw-r--r--plugins/jobs/tests/Jobs/Amqp/ErrorJob.php22
-rw-r--r--plugins/jobs/tests/Jobs/Amqp/Job.php26
-rw-r--r--plugins/jobs/tests/Jobs/BaseTest.php115
-rw-r--r--plugins/jobs/tests/Jobs/Beanstalk/BrokerTest.php20
-rw-r--r--plugins/jobs/tests/Jobs/Beanstalk/ErrorJob.php22
-rw-r--r--plugins/jobs/tests/Jobs/Beanstalk/Job.php26
-rw-r--r--plugins/jobs/tests/Jobs/Local/BrokerTest.php20
-rw-r--r--plugins/jobs/tests/Jobs/Local/ErrorJob.php22
-rw-r--r--plugins/jobs/tests/Jobs/Local/Job.php26
-rw-r--r--plugins/jobs/tests/Jobs/OptionsTest.php34
-rw-r--r--plugins/jobs/tests/Jobs/RegistryTest.php43
-rw-r--r--plugins/jobs/tests/Jobs/ShortCircuitTest.php90
-rw-r--r--plugins/jobs/tests/Jobs/Sqs/BrokerTest.php20
-rw-r--r--plugins/jobs/tests/Jobs/Sqs/ErrorJob.php22
-rw-r--r--plugins/jobs/tests/Jobs/Sqs/Job.php26
-rw-r--r--plugins/jobs/tests/bootstrap.php16
-rw-r--r--plugins/jobs/tests/consumer.php22
-rw-r--r--plugins/jobs/tests/docker-compose.yml22
78 files changed, 10760 insertions, 0 deletions
diff --git a/plugins/jobs/broker.go b/plugins/jobs/broker.go
new file mode 100644
index 00000000..0066a4f1
--- /dev/null
+++ b/plugins/jobs/broker.go
@@ -0,0 +1,47 @@
+package jobs
+
+// Broker manages set of pipelines and provides ability to push jobs into them.
+type Broker interface {
+ // Register broker pipeline.
+ Register(pipe *Pipeline) error
+
+ // Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
+ // the service is started!
+ Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error
+
+ // Push job into the worker.
+ Push(pipe *Pipeline, j *Job) (string, error)
+
+ // Stat must fetch statistics about given pipeline or return error.
+ Stat(pipe *Pipeline) (stat *Stat, err error)
+}
+
+// EventProvider defines the ability to throw events for the broker.
+type EventProvider interface {
+ // Listen attaches the even listener.
+ Listen(lsn func(event int, ctx interface{}))
+}
+
+// Stat contains information about pipeline.
+type Stat struct {
+ // Pipeline name.
+ Pipeline string
+
+ // Broken is name of associated broker.
+ Broker string
+
+ // InternalName defines internal broker specific pipeline name.
+ InternalName string
+
+ // Consuming indicates that pipeline is pipelines jobs.
+ Consuming bool
+
+ // testQueue defines number of pending jobs.
+ Queue int64
+
+ // Active defines number of jobs which are currently being processed.
+ Active int64
+
+ // Delayed defines number of jobs which are being processed.
+ Delayed int64
+}
diff --git a/plugins/jobs/broker/amqp/broker.go b/plugins/jobs/broker/amqp/broker.go
new file mode 100644
index 00000000..b47d83ee
--- /dev/null
+++ b/plugins/jobs/broker/amqp/broker.go
@@ -0,0 +1,216 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/gofrs/uuid"
+ "github.com/spiral/jobs/v2"
+ "sync"
+ "sync/atomic"
+)
+
+// Broker represents AMQP broker.
+type Broker struct {
+ cfg *Config
+ lsn func(event int, ctx interface{})
+ publish *chanPool
+ consume *chanPool
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ queues map[*jobs.Pipeline]*queue
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures AMQP job broker (always 2 connections).
+func (b *Broker) Init(cfg *Config) (ok bool, err error) {
+ b.cfg = cfg
+ b.queues = make(map[*jobs.Pipeline]*queue)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.queues[pipe]; ok {
+ return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
+ }
+
+ q, err := newQueue(pipe, b.throw)
+ if err != nil {
+ return err
+ }
+
+ b.queues[pipe] = q
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() (err error) {
+ b.mu.Lock()
+
+ if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ defer b.publish.Close()
+
+ if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ defer b.consume.Close()
+
+ for _, q := range b.queues {
+ err := q.declare(b.publish, q.name, q.key, nil)
+ if err != nil {
+ b.mu.Unlock()
+ return err
+ }
+ }
+
+ for _, q := range b.queues {
+ qq := q
+ if qq.execPool != nil {
+ go qq.serve(b.publish, b.consume)
+ }
+ }
+
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ for _, q := range b.queues {
+ q.stop()
+ }
+
+ close(b.wait)
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ q.stop()
+
+ q.execPool = execPool
+ q.errHandler = errHandler
+
+ if b.publish != nil && q.execPool != nil {
+ if q.execPool != nil {
+ go q.serve(b.publish, b.consume)
+ }
+ }
+
+ return nil
+}
+
+// Push job into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ id, err := uuid.NewV4()
+ if err != nil {
+ return "", err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil {
+ return "", err
+ }
+
+ return id.String(), nil
+}
+
+// Stat must fetch statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ queue, err := q.inspect(b.publish)
+ if err != nil {
+ return nil, err
+ }
+
+ // this the closest approximation we can get for now
+ return &jobs.Stat{
+ InternalName: queue.Name,
+ Queue: int64(queue.Messages),
+ Active: int64(atomic.LoadInt32(&q.running)),
+ }, nil
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return nil
+ }
+
+ return q
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
diff --git a/plugins/jobs/broker/amqp/broker_test.go b/plugins/jobs/broker/amqp/broker_test.go
new file mode 100644
index 00000000..66078099
--- /dev/null
+++ b/plugins/jobs/broker/amqp/broker_test.go
@@ -0,0 +1,419 @@
+package amqp
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+var (
+ pipe = &jobs.Pipeline{
+ "broker": "amqp",
+ "name": "default",
+ "queue": "rr-queue",
+ "exchange": "rr-exchange",
+ "prefetch": 1,
+ }
+
+ cfg = &Config{
+ Addr: "amqp://guest:guest@localhost:5672/",
+ }
+)
+
+var (
+ fanoutPipe = &jobs.Pipeline{
+ "broker": "amqp",
+ "name": "fanout",
+ "queue": "fanout-queue",
+ "exchange": "fanout-exchange",
+ "exchange-type": "fanout",
+ "prefetch": 1,
+ }
+
+ fanoutCfg = &Config{
+ Addr: "amqp://guest:guest@localhost:5672/",
+ }
+)
+
+func TestBroker_Init(t *testing.T) {
+ b := &Broker{}
+ ok, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.True(t, ok)
+ assert.NoError(t, err)
+}
+
+func TestBroker_StopNotStarted(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ b.Stop()
+}
+
+func TestBroker_Register(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+}
+
+func TestBroker_Register_Twice(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+ assert.Error(t, b.Register(pipe))
+}
+
+func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_Undefined(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ assert.NoError(t, b.Consume(pipe, exec, errf))
+}
+
+func TestBroker_Consume_BadPipeline(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Error(t, b.Register(&jobs.Pipeline{
+ "broker": "amqp",
+ "name": "default",
+ "exchange": "rr-exchange",
+ "prefetch": 1,
+ }))
+}
+
+func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Consume(pipe, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_CantStart(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(&Config{
+ Addr: "amqp://guest:guest@localhost:15672/",
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Serve())
+}
+
+func TestBroker_Consume_Serve_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ err = b.Consume(pipe, exec, errf)
+ if err != nil {
+ t.Fatal()
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_PushToNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_PushToNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_Queue_RoutingKey(t *testing.T) {
+ pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
+
+ assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key")
+}
+
+func TestBroker_Register_With_RoutingKey(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
+
+ assert.NoError(t, b.Register(&pipeWithKey))
+}
+
+func TestBroker_Consume_With_RoutingKey(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
+
+ err = b.Register(&pipeWithKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(&pipeWithKey, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Queue_ExchangeType(t *testing.T) {
+ pipeWithKey := pipe.With("exchange-type", "direct")
+
+ assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct")
+}
+
+func TestBroker_Register_With_ExchangeType(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := pipe.With("exchange-type", "fanout")
+
+ assert.NoError(t, b.Register(&pipeWithKey))
+}
+
+func TestBroker_Register_With_WrongExchangeType(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := pipe.With("exchange-type", "xxx")
+
+ assert.Error(t, b.Register(&pipeWithKey))
+}
+
+func TestBroker_Consume_With_ExchangeType(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(fanoutCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ pipeWithKey := fanoutPipe.With("exchange-type", "fanout")
+
+ err = b.Register(&pipeWithKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(&pipeWithKey, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
diff --git a/plugins/jobs/broker/amqp/config.go b/plugins/jobs/broker/amqp/config.go
new file mode 100644
index 00000000..0ed3a50e
--- /dev/null
+++ b/plugins/jobs/broker/amqp/config.go
@@ -0,0 +1,39 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Config defines sqs broker configuration.
+type Config struct {
+ // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/).
+ Addr string
+
+ // Timeout to allocate the connection. Default 10 seconds.
+ Timeout int
+}
+
+// Hydrate config values.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ if c.Addr == "" {
+ return fmt.Errorf("AMQP address is missing")
+ }
+
+ return nil
+}
+
+// TimeoutDuration returns number of seconds allowed to redial
+func (c *Config) TimeoutDuration() time.Duration {
+ timeout := c.Timeout
+ if timeout == 0 {
+ timeout = 10
+ }
+
+ return time.Duration(timeout) * time.Second
+}
diff --git a/plugins/jobs/broker/amqp/config_test.go b/plugins/jobs/broker/amqp/config_test.go
new file mode 100644
index 00000000..1abbb55d
--- /dev/null
+++ b/plugins/jobs/broker/amqp/config_test.go
@@ -0,0 +1,27 @@
+package amqp
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{`{"dead`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"addr":""}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
diff --git a/plugins/jobs/broker/amqp/conn.go b/plugins/jobs/broker/amqp/conn.go
new file mode 100644
index 00000000..be747776
--- /dev/null
+++ b/plugins/jobs/broker/amqp/conn.go
@@ -0,0 +1,232 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/cenkalti/backoff/v4"
+ "github.com/streadway/amqp"
+ "sync"
+ "time"
+)
+
+// manages set of AMQP channels
+type chanPool struct {
+ // timeout to backoff redial
+ tout time.Duration
+ url string
+
+ mu *sync.Mutex
+
+ conn *amqp.Connection
+ channels map[string]*channel
+ wait chan interface{}
+ connected chan interface{}
+}
+
+// manages single channel
+type channel struct {
+ ch *amqp.Channel
+ // todo unused
+ //consumer string
+ confirm chan amqp.Confirmation
+ signal chan error
+}
+
+// newConn creates new watched AMQP connection
+func newConn(url string, tout time.Duration) (*chanPool, error) {
+ conn, err := dial(url)
+ if err != nil {
+ return nil, err
+ }
+
+ cp := &chanPool{
+ url: url,
+ tout: tout,
+ conn: conn,
+ mu: &sync.Mutex{},
+ channels: make(map[string]*channel),
+ wait: make(chan interface{}),
+ connected: make(chan interface{}),
+ }
+
+ close(cp.connected)
+ go cp.watch()
+ return cp, nil
+}
+
+// dial dials to AMQP.
+func dial(url string) (*amqp.Connection, error) {
+ return amqp.Dial(url)
+}
+
+// Close gracefully closes all underlying channels and connection.
+func (cp *chanPool) Close() error {
+ cp.mu.Lock()
+
+ close(cp.wait)
+ if cp.channels == nil {
+ return fmt.Errorf("connection is dead")
+ }
+
+ // close all channels and consume
+ var wg sync.WaitGroup
+ for _, ch := range cp.channels {
+ wg.Add(1)
+
+ go func(ch *channel) {
+ defer wg.Done()
+ cp.closeChan(ch, nil)
+ }(ch)
+ }
+ cp.mu.Unlock()
+
+ wg.Wait()
+
+ cp.mu.Lock()
+ defer cp.mu.Unlock()
+
+ if cp.conn != nil {
+ return cp.conn.Close()
+ }
+
+ return nil
+}
+
+// waitConnected waits till connection is connected again or eventually closed.
+// must only be invoked after connection error has been delivered to channel.signal.
+func (cp *chanPool) waitConnected() chan interface{} {
+ cp.mu.Lock()
+ defer cp.mu.Unlock()
+
+ return cp.connected
+}
+
+// watch manages connection state and reconnects if needed
+func (cp *chanPool) watch() {
+ for {
+ select {
+ case <-cp.wait:
+ // connection has been closed
+ return
+ // here we are waiting for the errors from amqp connection
+ case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)):
+ cp.mu.Lock()
+ // clear connected, since connections are dead
+ cp.connected = make(chan interface{})
+
+ // broadcast error to all consume to let them for the tryReconnect
+ for _, ch := range cp.channels {
+ ch.signal <- err
+ }
+
+ // disable channel allocation while server is dead
+ cp.conn = nil
+ cp.channels = nil
+
+ // initialize the backoff
+ expb := backoff.NewExponentialBackOff()
+ expb.MaxInterval = cp.tout
+ cp.mu.Unlock()
+
+ // reconnect function
+ reconnect := func() error {
+ cp.mu.Lock()
+ conn, err := dial(cp.url)
+ if err != nil {
+ // still failing
+ fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error()))
+ cp.mu.Unlock()
+ return err
+ }
+
+ // TODO ADD LOGGING
+ fmt.Println("------amqp successfully redialed------")
+
+ // here we are reconnected
+ // replace the connection
+ cp.conn = conn
+ // re-init the channels
+ cp.channels = make(map[string]*channel)
+ cp.mu.Unlock()
+ return nil
+ }
+
+ // start backoff retry
+ errb := backoff.Retry(reconnect, expb)
+ if errb != nil {
+ fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error()))
+ // reconnection failed
+ close(cp.connected)
+ return
+ }
+ close(cp.connected)
+ }
+ }
+}
+
+// channel allocates new channel on amqp connection
+func (cp *chanPool) channel(name string) (*channel, error) {
+ cp.mu.Lock()
+ dead := cp.conn == nil
+ cp.mu.Unlock()
+
+ if dead {
+ // wait for connection restoration (doubled the timeout duration)
+ select {
+ case <-time.NewTimer(cp.tout * 2).C:
+ return nil, fmt.Errorf("connection is dead")
+ case <-cp.connected:
+ // connected
+ }
+ }
+
+ cp.mu.Lock()
+ defer cp.mu.Unlock()
+
+ if cp.conn == nil {
+ return nil, fmt.Errorf("connection has been closed")
+ }
+
+ if ch, ok := cp.channels[name]; ok {
+ return ch, nil
+ }
+
+ // we must create new channel
+ ch, err := cp.conn.Channel()
+ if err != nil {
+ return nil, err
+ }
+
+ // Enable publish confirmations
+ if err = ch.Confirm(false); err != nil {
+ return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err)
+ }
+
+ // we expect that every allocated channel would have listener on signal
+ // this is not true only in case of pure producing channels
+ cp.channels[name] = &channel{
+ ch: ch,
+ confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)),
+ signal: make(chan error, 1),
+ }
+
+ return cp.channels[name], nil
+}
+
+// closeChan gracefully closes and removes channel allocation.
+func (cp *chanPool) closeChan(c *channel, err error) error {
+ cp.mu.Lock()
+ defer cp.mu.Unlock()
+
+ go func() {
+ c.signal <- nil
+ c.ch.Close()
+ }()
+
+ for name, ch := range cp.channels {
+ if ch == c {
+ delete(cp.channels, name)
+ }
+ }
+
+ return err
+}
diff --git a/plugins/jobs/broker/amqp/consume_test.go b/plugins/jobs/broker/amqp/consume_test.go
new file mode 100644
index 00000000..28999c36
--- /dev/null
+++ b/plugins/jobs/broker/amqp/consume_test.go
@@ -0,0 +1,258 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Consume_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_Delayed(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ start := time.Now()
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Delay: 1},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ elapsed := time.Since(start)
+ assert.True(t, elapsed >= time.Second)
+ assert.True(t, elapsed < 3*time.Second)
+}
+
+func TestBroker_Consume_Errored(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ close(errHandled)
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return fmt.Errorf("job failed")
+ }
+
+ <-waitJob
+ <-errHandled
+}
+
+func TestBroker_Consume_Errored_Attempts(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ attempts := 0
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ attempts++
+ errHandled <- nil
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Attempts: 3},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ return fmt.Errorf("job failed")
+ }
+
+ <-errHandled
+ <-errHandled
+ <-errHandled
+ assert.Equal(t, 3, attempts)
+}
diff --git a/plugins/jobs/broker/amqp/durability_test.go b/plugins/jobs/broker/amqp/durability_test.go
new file mode 100644
index 00000000..00d62c51
--- /dev/null
+++ b/plugins/jobs/broker/amqp/durability_test.go
@@ -0,0 +1,728 @@
+package amqp
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "net"
+ "sync"
+ "testing"
+ "time"
+)
+
+var (
+ proxyCfg = &Config{
+ Addr: "amqp://guest:guest@localhost:5673/",
+ Timeout: 1,
+ }
+
+ proxy = &tcpProxy{
+ listen: "localhost:5673",
+ upstream: "localhost:5672",
+ accept: true,
+ }
+)
+
+type tcpProxy struct {
+ listen string
+ upstream string
+ mu sync.Mutex
+ accept bool
+ conn []net.Conn
+}
+
+func (p *tcpProxy) serve() {
+ l, err := net.Listen("tcp", p.listen)
+ if err != nil {
+ panic(err)
+ }
+
+ for {
+ in, err := l.Accept()
+ if err != nil {
+ panic(err)
+ }
+
+ if !p.accepting() {
+ in.Close()
+ }
+
+ up, err := net.Dial("tcp", p.upstream)
+ if err != nil {
+ panic(err)
+ }
+
+ go io.Copy(in, up)
+ go io.Copy(up, in)
+
+ p.mu.Lock()
+ p.conn = append(p.conn, in, up)
+ p.mu.Unlock()
+ }
+}
+
+// wait for specific number of connections
+func (p *tcpProxy) waitConn(count int) *tcpProxy {
+ p.mu.Lock()
+ p.accept = true
+ p.mu.Unlock()
+
+ for {
+ p.mu.Lock()
+ current := len(p.conn)
+ p.mu.Unlock()
+
+ if current >= count*2 {
+ break
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+
+ return p
+}
+
+func (p *tcpProxy) reset(accept bool) int {
+ p.mu.Lock()
+ p.accept = accept
+ defer p.mu.Unlock()
+
+ count := 0
+ for _, conn := range p.conn {
+ conn.Close()
+ count++
+ }
+
+ p.conn = nil
+ return count / 2
+}
+
+func (p *tcpProxy) accepting() bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.accept
+}
+
+func init() {
+ go proxy.serve()
+}
+
+func TestBroker_Durability_Base(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ // expect 2 connections
+ proxy.waitConn(2)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Durability_Consume(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ time.Sleep(3 * time.Second)
+ proxy.waitConn(1)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NotEqual(t, "0", jid)
+
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume2(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+ if perr != nil {
+ panic(perr)
+ }
+
+ proxy.reset(true)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume2_2(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // start when connection is dead
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+ if perr != nil {
+ panic(perr)
+ }
+
+ proxy.reset(false)
+
+ _, serr := b.Stat(pipe)
+ assert.Error(t, serr)
+
+ proxy.reset(true)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume3(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(2)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+ if perr != nil {
+ panic(perr)
+ }
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume4(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ ch, err := b.consume.channel("purger")
+ if err != nil {
+ panic(err)
+ }
+ _, err = ch.ch.QueuePurge("rr-queue", false)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ proxy.waitConn(2)
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "kill",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+
+ if j.Payload == "kill" && len(done) == 0 {
+ proxy.reset(true)
+ }
+
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 3 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_StopDead(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+
+ <-ready
+
+ proxy.waitConn(2).reset(false)
+
+ b.Stop()
+}
diff --git a/plugins/jobs/broker/amqp/job.go b/plugins/jobs/broker/amqp/job.go
new file mode 100644
index 00000000..bd559715
--- /dev/null
+++ b/plugins/jobs/broker/amqp/job.go
@@ -0,0 +1,56 @@
+package amqp
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/streadway/amqp"
+)
+
+// pack job metadata into headers
+func pack(id string, attempt int, j *jobs.Job) amqp.Table {
+ return amqp.Table{
+ "rr-id": id,
+ "rr-job": j.Job,
+ "rr-attempt": int64(attempt),
+ "rr-maxAttempts": int64(j.Options.Attempts),
+ "rr-timeout": int64(j.Options.Timeout),
+ "rr-delay": int64(j.Options.Delay),
+ "rr-retryDelay": int64(j.Options.RetryDelay),
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) {
+ j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}}
+
+ if _, ok := d.Headers["rr-id"].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id")
+ }
+
+ if _, ok := d.Headers["rr-attempt"].(int64); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt")
+ }
+
+ if _, ok := d.Headers["rr-job"].(string); !ok {
+ return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job")
+ }
+ j.Job = d.Headers["rr-job"].(string)
+
+ if _, ok := d.Headers["rr-maxAttempts"].(int64); ok {
+ j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-timeout"].(int64); ok {
+ j.Options.Timeout = int(d.Headers["rr-timeout"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-delay"].(int64); ok {
+ j.Options.Delay = int(d.Headers["rr-delay"].(int64))
+ }
+
+ if _, ok := d.Headers["rr-retryDelay"].(int64); ok {
+ j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64))
+ }
+
+ return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil
+}
diff --git a/plugins/jobs/broker/amqp/job_test.go b/plugins/jobs/broker/amqp/job_test.go
new file mode 100644
index 00000000..24ca453b
--- /dev/null
+++ b/plugins/jobs/broker/amqp/job_test.go
@@ -0,0 +1,29 @@
+package amqp
+
+import (
+ "github.com/streadway/amqp"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_Unpack_Errors(t *testing.T) {
+ _, _, _, err := unpack(amqp.Delivery{
+ Headers: map[string]interface{}{},
+ })
+ assert.Error(t, err)
+
+ _, _, _, err = unpack(amqp.Delivery{
+ Headers: map[string]interface{}{
+ "rr-id": "id",
+ },
+ })
+ assert.Error(t, err)
+
+ _, _, _, err = unpack(amqp.Delivery{
+ Headers: map[string]interface{}{
+ "rr-id": "id",
+ "rr-attempt": int64(0),
+ },
+ })
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/amqp/queue.go b/plugins/jobs/broker/amqp/queue.go
new file mode 100644
index 00000000..6ef5f20f
--- /dev/null
+++ b/plugins/jobs/broker/amqp/queue.go
@@ -0,0 +1,302 @@
+package amqp
+
+import (
+ "errors"
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/streadway/amqp"
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type ExchangeType string
+
+const (
+ Direct ExchangeType = "direct"
+ Fanout ExchangeType = "fanout"
+ Topic ExchangeType = "topic"
+ Headers ExchangeType = "headers"
+)
+
+func (et ExchangeType) IsValid() error {
+ switch et {
+ case Direct, Fanout, Topic, Headers:
+ return nil
+ }
+ return errors.New("unknown exchange-type")
+}
+
+func (et ExchangeType) String() string {
+ switch et {
+ case Direct, Fanout, Topic, Headers:
+ return string(et)
+ default:
+ return "direct"
+ }
+}
+
+
+type queue struct {
+ active int32
+ pipe *jobs.Pipeline
+ exchange string
+ exchangeType ExchangeType
+ name, key string
+ consumer string
+
+ // active consuming channel
+ muc sync.Mutex
+ cc *channel
+
+ // queue events
+ lsn func(event int, ctx interface{})
+
+ // active operations
+ muw sync.RWMutex
+ wg sync.WaitGroup
+
+ // exec handlers
+ running int32
+ execPool chan jobs.Handler
+ errHandler jobs.ErrorHandler
+}
+
+// newQueue creates new queue wrapper for AMQP.
+func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
+ if pipe.String("queue", "") == "" {
+ return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline")
+ }
+
+ exchangeType := ExchangeType(pipe.String("exchange-type", "direct"))
+
+ err := exchangeType.IsValid()
+ if err != nil {
+ return nil, fmt.Errorf(err.Error())
+ }
+
+ return &queue{
+ exchange: pipe.String("exchange", "amqp.direct"),
+ exchangeType: exchangeType,
+ name: pipe.String("queue", ""),
+ key: pipe.String("routing-key", pipe.String("queue", "")),
+ consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())),
+ pipe: pipe,
+ lsn: lsn,
+ }, nil
+}
+
+// serve consumes queue
+func (q *queue) serve(publish, consume *chanPool) {
+ atomic.StoreInt32(&q.active, 1)
+
+ for {
+ <-consume.waitConnected()
+ if atomic.LoadInt32(&q.active) == 0 {
+ // stopped
+ return
+ }
+
+ delivery, cc, err := q.consume(consume)
+ if err != nil {
+ q.report(err)
+ continue
+ }
+
+ q.muc.Lock()
+ q.cc = cc
+ q.muc.Unlock()
+
+ for d := range delivery {
+ q.muw.Lock()
+ q.wg.Add(1)
+ q.muw.Unlock()
+
+ atomic.AddInt32(&q.running, 1)
+ h := <-q.execPool
+
+ go func(h jobs.Handler, d amqp.Delivery) {
+ err := q.do(publish, h, d)
+
+ atomic.AddInt32(&q.running, ^int32(0))
+ q.execPool <- h
+ q.wg.Done()
+ q.report(err)
+ }(h, d)
+ }
+ }
+}
+
+func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) {
+ // allocate channel for the consuming
+ if cc, err = consume.channel(q.name); err != nil {
+ return nil, nil, err
+ }
+
+ if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil {
+ return nil, nil, consume.closeChan(cc, err)
+ }
+
+ delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil)
+ if err != nil {
+ return nil, nil, consume.closeChan(cc, err)
+ }
+
+ // do i like it?
+ go func(consume *chanPool) {
+ for err := range cc.signal {
+ consume.closeChan(cc, err)
+ return
+ }
+ }(consume)
+
+ return delivery, cc, err
+}
+
+func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error {
+ id, attempt, j, err := unpack(d)
+ if err != nil {
+ q.report(err)
+ return d.Nack(false, false)
+ }
+ err = h(id, j)
+
+ if err == nil {
+ return d.Ack(false)
+ }
+
+ // failed
+ q.errHandler(id, j, err)
+
+ if !j.Options.CanRetry(attempt) {
+ return d.Nack(false, false)
+ }
+
+ // retry as new j (to accommodate attempt number and new delay)
+ if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil {
+ q.report(err)
+ return d.Nack(false, true)
+ }
+
+ return d.Ack(false)
+}
+
+func (q *queue) stop() {
+ if atomic.LoadInt32(&q.active) == 0 {
+ return
+ }
+
+ atomic.StoreInt32(&q.active, 0)
+
+ q.muc.Lock()
+ if q.cc != nil {
+ // gracefully stopped consuming
+ q.report(q.cc.ch.Cancel(q.consumer, true))
+ }
+ q.muc.Unlock()
+
+ q.muw.Lock()
+ q.wg.Wait()
+ q.muw.Unlock()
+}
+
+// publish message to queue or to delayed queue.
+func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error {
+ c, err := cp.channel(q.name)
+ if err != nil {
+ return err
+ }
+
+ qKey := q.key
+
+ if delay != 0 {
+ delayMs := int64(delay.Seconds() * 1000)
+ qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name)
+ qKey = qName
+
+ err := q.declare(cp, qName, qName, amqp.Table{
+ "x-dead-letter-exchange": q.exchange,
+ "x-dead-letter-routing-key": q.name,
+ "x-message-ttl": delayMs,
+ "x-expires": delayMs * 2,
+ })
+
+ if err != nil {
+ return err
+ }
+ }
+
+ err = c.ch.Publish(
+ q.exchange, // exchange
+ qKey, // routing key
+ false, // mandatory
+ false, // immediate
+ amqp.Publishing{
+ ContentType: "application/octet-stream",
+ Body: j.Body(),
+ DeliveryMode: amqp.Persistent,
+ Headers: pack(id, attempt, j),
+ },
+ )
+
+ if err != nil {
+ return cp.closeChan(c, err)
+ }
+
+ confirmed, ok := <-c.confirm
+ if ok && confirmed.Ack {
+ return nil
+ }
+
+ return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag)
+}
+
+// declare queue and binding to it
+func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error {
+ c, err := cp.channel(q.name)
+ if err != nil {
+ return err
+ }
+
+ err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil)
+ if err != nil {
+ return cp.closeChan(c, err)
+ }
+
+ _, err = c.ch.QueueDeclare(queue, true, false, false, false, args)
+ if err != nil {
+ return cp.closeChan(c, err)
+ }
+
+ err = c.ch.QueueBind(queue, key, q.exchange, false, nil)
+ if err != nil {
+ return cp.closeChan(c, err)
+ }
+
+ // keep channel open
+ return err
+}
+
+// inspect the queue
+func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) {
+ c, err := cp.channel("stat")
+ if err != nil {
+ return nil, err
+ }
+
+ queue, err := c.ch.QueueInspect(q.name)
+ if err != nil {
+ return nil, cp.closeChan(c, err)
+ }
+
+ // keep channel open
+ return &queue, err
+}
+
+// throw handles service, server and pool events.
+func (q *queue) report(err error) {
+ if err != nil {
+ q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
+ }
+}
diff --git a/plugins/jobs/broker/amqp/stat_test.go b/plugins/jobs/broker/amqp/stat_test.go
new file mode 100644
index 00000000..ef19746c
--- /dev/null
+++ b/plugins/jobs/broker/amqp/stat_test.go
@@ -0,0 +1,63 @@
+package amqp
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "sync"
+ "testing"
+)
+
+func TestBroker_Stat(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ exec <- func(id string, j *jobs.Job) error {
+ defer wg.Done()
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Active)
+
+ return nil
+ }
+
+ wg.Wait()
+ stat, err = b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+}
diff --git a/plugins/jobs/broker/beanstalk/broker.go b/plugins/jobs/broker/beanstalk/broker.go
new file mode 100644
index 00000000..dc3ea518
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/broker.go
@@ -0,0 +1,185 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "sync"
+)
+
+// Broker run consume using Broker service.
+type Broker struct {
+ cfg *Config
+ lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ conn *conn
+ tubes map[*jobs.Pipeline]*tube
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures broker.
+func (b *Broker) Init(cfg *Config) (bool, error) {
+ b.cfg = cfg
+ b.tubes = make(map[*jobs.Pipeline]*tube)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.tubes[pipe]; ok {
+ return fmt.Errorf("tube `%s` has already been registered", pipe.Name())
+ }
+
+ t, err := newTube(pipe, b.throw)
+ if err != nil {
+ return err
+ }
+
+ b.tubes[pipe] = t
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() (err error) {
+ b.mu.Lock()
+
+ if b.conn, err = b.cfg.newConn(); err != nil {
+ return err
+ }
+ defer b.conn.Close()
+
+ for _, t := range b.tubes {
+ tt := t
+ if tt.execPool != nil {
+ go tt.serve(b.cfg)
+ }
+ }
+
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ for _, t := range b.tubes {
+ t.stop()
+ }
+
+ close(b.wait)
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ t, ok := b.tubes[pipe]
+ if !ok {
+ return fmt.Errorf("undefined tube `%s`", pipe.Name())
+ }
+
+ t.stop()
+
+ t.execPool = execPool
+ t.errHandler = errHandler
+
+ if b.conn != nil {
+ tt := t
+ if tt.execPool != nil {
+ go tt.serve(connFactory(b.cfg))
+ }
+ }
+
+ return nil
+}
+
+// Push data into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ t := b.tube(pipe)
+ if t == nil {
+ return "", fmt.Errorf("undefined tube `%s`", pipe.Name())
+ }
+
+ data, err := pack(j)
+ if err != nil {
+ return "", err
+ }
+
+ return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration())
+}
+
+// Stat must fetch statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ t := b.tube(pipe)
+ if t == nil {
+ return nil, fmt.Errorf("undefined tube `%s`", pipe.Name())
+ }
+
+ return t.stat(b.conn)
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) tube(pipe *jobs.Pipeline) *tube {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ t, ok := b.tubes[pipe]
+ if !ok {
+ return nil
+ }
+
+ return t
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
diff --git a/plugins/jobs/broker/beanstalk/broker_test.go b/plugins/jobs/broker/beanstalk/broker_test.go
new file mode 100644
index 00000000..cd2132af
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/broker_test.go
@@ -0,0 +1,276 @@
+package beanstalk
+
+import (
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+var (
+ pipe = &jobs.Pipeline{
+ "broker": "beanstalk",
+ "name": "default",
+ "tube": "test",
+ }
+
+ cfg = &Config{
+ Addr: "tcp://localhost:11300",
+ }
+)
+
+func init() {
+ conn, err := beanstalk.Dial("tcp", "localhost:11300")
+ if err != nil {
+ panic(err)
+ }
+ defer conn.Close()
+
+ t := beanstalk.Tube{Name: "testTube", Conn: conn}
+
+ for {
+ id, _, err := t.PeekReady()
+ if id == 0 || err != nil {
+ break
+ }
+
+ if err := conn.Delete(id); err != nil {
+ panic(err)
+ }
+ }
+}
+
+func TestBroker_Init(t *testing.T) {
+ b := &Broker{}
+ ok, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.True(t, ok)
+ assert.NoError(t, err)
+}
+
+func TestBroker_StopNotStarted(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ b.Stop()
+}
+
+func TestBroker_Register(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+}
+
+func TestBroker_Register_Twice(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+ assert.Error(t, b.Register(pipe))
+}
+
+func TestBroker_Register_Invalid(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Error(t, b.Register(&jobs.Pipeline{
+ "broker": "beanstalk",
+ "name": "default",
+ }))
+}
+
+func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_Undefined(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ assert.NoError(t, b.Consume(pipe, exec, errf))
+}
+
+func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = b.Consume(pipe, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_Serve_Error(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(&Config{
+ Addr: "tcp://localhost:11399",
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Serve())
+}
+
+func TestBroker_Consume_Serve_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ err = b.Consume(pipe, exec, errf)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_PushToNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_PushToNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/beanstalk/config.go b/plugins/jobs/broker/beanstalk/config.go
new file mode 100644
index 00000000..3e48a2d7
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/config.go
@@ -0,0 +1,50 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner/service"
+ "strings"
+ "time"
+)
+
+// Config defines beanstalk broker configuration.
+type Config struct {
+ // Addr of beanstalk server.
+ Addr string
+
+ // Timeout to allocate the connection. Default 10 seconds.
+ Timeout int
+}
+
+// Hydrate config values.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ if c.Addr == "" {
+ return fmt.Errorf("beanstalk address is missing")
+ }
+
+ return nil
+}
+
+// TimeoutDuration returns number of seconds allowed to allocate the connection.
+func (c *Config) TimeoutDuration() time.Duration {
+ timeout := c.Timeout
+ if timeout == 0 {
+ timeout = 10
+ }
+
+ return time.Duration(timeout) * time.Second
+}
+
+// size creates new rpc socket Listener.
+func (c *Config) newConn() (*conn, error) {
+ dsn := strings.Split(c.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)")
+ }
+
+ return newConn(dsn[0], dsn[1], c.TimeoutDuration())
+}
diff --git a/plugins/jobs/broker/beanstalk/config_test.go b/plugins/jobs/broker/beanstalk/config_test.go
new file mode 100644
index 00000000..4ba08a04
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/config_test.go
@@ -0,0 +1,47 @@
+package beanstalk
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func TestConfig_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{`{"dead`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func TestConfig_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"addr":""}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func TestConfig_Hydrate_Error3(t *testing.T) {
+ cfg := &mockCfg{`{"addr":"tcp"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ _, err := c.newConn()
+ assert.Error(t, err)
+}
+
+func TestConfig_Hydrate_Error4(t *testing.T) {
+ cfg := &mockCfg{`{"addr":"unix://sock.bean"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ _, err := c.newConn()
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/beanstalk/conn.go b/plugins/jobs/broker/beanstalk/conn.go
new file mode 100644
index 00000000..7aba6bbb
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/conn.go
@@ -0,0 +1,180 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
+ "strings"
+ "sync"
+ "time"
+)
+
+var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
+
+// creates new connections
+type connFactory interface {
+ newConn() (*conn, error)
+}
+
+// conn protects allocation for one connection between
+// threads and provides reconnecting capabilities.
+type conn struct {
+ tout time.Duration
+ conn *beanstalk.Conn
+ alive bool
+ free chan interface{}
+ dead chan interface{}
+ stop chan interface{}
+ lock *sync.Cond
+}
+
+// creates new beanstalk connection and reconnect watcher.
+func newConn(network, addr string, tout time.Duration) (cn *conn, err error) {
+ cn = &conn{
+ tout: tout,
+ alive: true,
+ free: make(chan interface{}, 1),
+ dead: make(chan interface{}, 1),
+ stop: make(chan interface{}),
+ lock: sync.NewCond(&sync.Mutex{}),
+ }
+
+ cn.conn, err = beanstalk.Dial(network, addr)
+ if err != nil {
+ return nil, err
+ }
+
+ go cn.watch(network, addr)
+
+ return cn, nil
+}
+
+// reset the connection and reconnect watcher.
+func (cn *conn) Close() error {
+ cn.lock.L.Lock()
+ defer cn.lock.L.Unlock()
+
+ close(cn.stop)
+ for cn.alive {
+ cn.lock.Wait()
+ }
+
+ return nil
+}
+
+// acquire connection instance or return error in case of timeout. When mandratory set to true
+// timeout won't be applied.
+func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) {
+ // do not apply timeout on mandatory connections
+ if mandatory {
+ select {
+ case <-cn.stop:
+ return nil, fmt.Errorf("connection closed")
+ case <-cn.free:
+ return cn.conn, nil
+ }
+ }
+
+ select {
+ case <-cn.stop:
+ return nil, fmt.Errorf("connection closed")
+ case <-cn.free:
+ return cn.conn, nil
+ default:
+ // *2 to handle commands called right after the connection reset
+ tout := time.NewTimer(cn.tout * 2)
+ select {
+ case <-cn.stop:
+ tout.Stop()
+ return nil, fmt.Errorf("connection closed")
+ case <-cn.free:
+ tout.Stop()
+ return cn.conn, nil
+ case <-tout.C:
+ return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout)
+ }
+ }
+}
+
+// release acquired connection.
+func (cn *conn) release(err error) error {
+ if isConnError(err) {
+ // reconnect is required
+ cn.dead <- err
+ } else {
+ cn.free <- nil
+ }
+
+ return err
+}
+
+// watch and reconnect if dead
+func (cn *conn) watch(network, addr string) {
+ cn.free <- nil
+ t := time.NewTicker(WatchThrottleLimit)
+ defer t.Stop()
+ for {
+ select {
+ case <-cn.dead:
+ // simple throttle limiter
+ <-t.C
+ // try to reconnect
+ // TODO add logging here
+ expb := backoff.NewExponentialBackOff()
+ expb.MaxInterval = cn.tout
+
+ reconnect := func() error {
+ conn, err := beanstalk.Dial(network, addr)
+ if err != nil {
+ fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error()))
+ return err
+ }
+
+ // TODO ADD LOGGING
+ fmt.Println("------beanstalk successfully redialed------")
+
+ cn.conn = conn
+ cn.free <- nil
+ return nil
+ }
+
+ err := backoff.Retry(reconnect, expb)
+ if err != nil {
+ fmt.Println(fmt.Sprintf("redial failed: %s", err.Error()))
+ cn.dead <- nil
+ }
+
+ case <-cn.stop:
+ cn.lock.L.Lock()
+ select {
+ case <-cn.dead:
+ case <-cn.free:
+ }
+
+ // stop underlying connection
+ cn.conn.Close()
+ cn.alive = false
+ cn.lock.Signal()
+
+ cn.lock.L.Unlock()
+
+ return
+ }
+ }
+}
+
+// isConnError indicates that error is related to dead socket.
+func isConnError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ for _, errStr := range connErrors {
+ // golang...
+ if strings.Contains(err.Error(), errStr) {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/plugins/jobs/broker/beanstalk/constants.go b/plugins/jobs/broker/beanstalk/constants.go
new file mode 100644
index 00000000..84be305e
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/constants.go
@@ -0,0 +1,6 @@
+package beanstalk
+
+import "time"
+
+// WatchThrottleLimit is used to limit reconnection occurrence in watch function
+const WatchThrottleLimit = time.Second \ No newline at end of file
diff --git a/plugins/jobs/broker/beanstalk/consume_test.go b/plugins/jobs/broker/beanstalk/consume_test.go
new file mode 100644
index 00000000..b16866ae
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/consume_test.go
@@ -0,0 +1,242 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Consume_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_Delayed(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Delay: 1},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ start := time.Now()
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ elapsed := time.Since(start)
+ assert.True(t, elapsed >= time.Second)
+ assert.True(t, elapsed < 2*time.Second)
+}
+
+func TestBroker_Consume_Errored(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ close(errHandled)
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return fmt.Errorf("job failed")
+ }
+
+ <-waitJob
+ <-errHandled
+}
+
+func TestBroker_Consume_Errored_Attempts(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ attempts := 0
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ attempts++
+ errHandled <- nil
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Attempts: 3},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ return fmt.Errorf("job failed")
+ }
+
+ <-errHandled
+ <-errHandled
+ <-errHandled
+ assert.Equal(t, 3, attempts)
+}
diff --git a/plugins/jobs/broker/beanstalk/durability_test.go b/plugins/jobs/broker/beanstalk/durability_test.go
new file mode 100644
index 00000000..499a5206
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/durability_test.go
@@ -0,0 +1,575 @@
+package beanstalk
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "net"
+ "sync"
+ "testing"
+ "time"
+)
+
+var (
+ proxyCfg = &Config{
+ Addr: "tcp://localhost:11301",
+ Timeout: 1,
+ }
+
+ proxy = &tcpProxy{
+ listen: "localhost:11301",
+ upstream: "localhost:11300",
+ accept: true,
+ }
+)
+
+type tcpProxy struct {
+ listen string
+ upstream string
+ mu sync.Mutex
+ accept bool
+ conn []net.Conn
+}
+
+func (p *tcpProxy) serve() {
+ l, err := net.Listen("tcp", p.listen)
+ if err != nil {
+ panic(err)
+ }
+
+ for {
+ in, err := l.Accept()
+ if err != nil {
+ panic(err)
+ }
+
+ if !p.accepting() {
+ in.Close()
+ }
+
+ up, err := net.Dial("tcp", p.upstream)
+ if err != nil {
+ panic(err)
+ }
+
+ go io.Copy(in, up)
+ go io.Copy(up, in)
+
+ p.mu.Lock()
+ p.conn = append(p.conn, in, up)
+ p.mu.Unlock()
+ }
+}
+
+// wait for specific number of connections
+func (p *tcpProxy) waitConn(count int) *tcpProxy {
+ p.mu.Lock()
+ p.accept = true
+ p.mu.Unlock()
+
+ for {
+ p.mu.Lock()
+ current := len(p.conn)
+ p.mu.Unlock()
+
+ if current >= count*2 {
+ break
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+
+ return p
+}
+
+func (p *tcpProxy) reset(accept bool) int {
+ p.mu.Lock()
+ p.accept = accept
+ defer p.mu.Unlock()
+
+ count := 0
+ for _, conn := range p.conn {
+ conn.Close()
+ count++
+ }
+
+ p.conn = nil
+ return count / 2
+}
+
+func (p *tcpProxy) accepting() bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.accept
+}
+
+func init() {
+ go proxy.serve()
+}
+
+func TestBroker_Durability_Base(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ // expect 2 connections
+ proxy.waitConn(2)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Durability_Consume(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ done[id] = true
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ st, err := b.Stat(pipe)
+ if err != nil {
+ continue
+ }
+
+ // wait till pipeline is empty
+ if st.Queue+st.Active == 0 {
+ return
+ }
+ }
+}
+
+func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // reoccuring
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ time.Sleep(3 * time.Second)
+ proxy.waitConn(1)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NotEqual(t, "0", jid)
+
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume2(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(2).reset(false)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ st, serr := b.Stat(pipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(1), st.Queue+st.Active)
+
+ proxy.reset(true)
+
+ // auto-reconnect
+ _, serr = b.Stat(pipe)
+ assert.NoError(t, serr)
+
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ done[id] = true
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ st, err := b.Stat(pipe)
+ if err != nil {
+ continue
+ }
+
+ // wait till pipeline is empty
+ if st.Queue+st.Active == 0 {
+ return
+ }
+ }
+}
+
+func TestBroker_Durability_Consume3(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(2)
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ st, serr := b.Stat(pipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(1), st.Queue+st.Active)
+
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ done[id] = true
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ st, err := b.Stat(pipe)
+ if err != nil {
+ continue
+ }
+
+ // wait till pipeline is empty
+ if st.Queue+st.Active == 0 {
+ return
+ }
+ }
+}
+
+func TestBroker_Durability_Consume4(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(2)
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "kill",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ st, serr := b.Stat(pipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(3), st.Queue+st.Active)
+
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ done[id] = true
+ if j.Payload == "kill" {
+ proxy.reset(true)
+ }
+
+ return nil
+ }
+
+ for {
+ st, err := b.Stat(pipe)
+ if err != nil {
+ continue
+ }
+
+ // wait till pipeline is empty
+ if st.Queue+st.Active == 0 {
+ return
+ }
+ }
+}
+
+func TestBroker_Durability_StopDead(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+
+ <-ready
+
+ proxy.waitConn(2).reset(false)
+
+ b.Stop()
+}
diff --git a/plugins/jobs/broker/beanstalk/job.go b/plugins/jobs/broker/beanstalk/job.go
new file mode 100644
index 00000000..fd9c8c3c
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/job.go
@@ -0,0 +1,24 @@
+package beanstalk
+
+import (
+ "bytes"
+ "encoding/gob"
+ "github.com/spiral/jobs/v2"
+)
+
+func pack(j *jobs.Job) ([]byte, error) {
+ b := new(bytes.Buffer)
+ err := gob.NewEncoder(b).Encode(j)
+ if err != nil {
+ return nil, err
+ }
+
+ return b.Bytes(), nil
+}
+
+func unpack(data []byte) (*jobs.Job, error) {
+ j := &jobs.Job{}
+ err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j)
+
+ return j, err
+}
diff --git a/plugins/jobs/broker/beanstalk/sock.bean b/plugins/jobs/broker/beanstalk/sock.bean
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/sock.bean
diff --git a/plugins/jobs/broker/beanstalk/stat_test.go b/plugins/jobs/broker/beanstalk/stat_test.go
new file mode 100644
index 00000000..14a55859
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/stat_test.go
@@ -0,0 +1,66 @@
+package beanstalk
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Stat(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ b.Register(pipe)
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ // beanstalk reserves job right after push
+ time.Sleep(time.Millisecond * 100)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), stat.Queue)
+ assert.Equal(t, int64(1), stat.Active)
+
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ stat, err = b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), stat.Queue)
+}
diff --git a/plugins/jobs/broker/beanstalk/tube.go b/plugins/jobs/broker/beanstalk/tube.go
new file mode 100644
index 00000000..9d7ad117
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/tube.go
@@ -0,0 +1,250 @@
+package beanstalk
+
+import (
+ "fmt"
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/jobs/v2"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type tube struct {
+ active int32
+ pipe *jobs.Pipeline
+ mut sync.Mutex
+ tube *beanstalk.Tube
+ tubeSet *beanstalk.TubeSet
+ reserve time.Duration
+
+ // tube events
+ lsn func(event int, ctx interface{})
+
+ // stop channel
+ wait chan interface{}
+
+ // active operations
+ muw sync.RWMutex
+ wg sync.WaitGroup
+
+ // exec handlers
+ execPool chan jobs.Handler
+ errHandler jobs.ErrorHandler
+}
+
+type entry struct {
+ id uint64
+ data []byte
+}
+
+func (e *entry) String() string {
+ return fmt.Sprintf("%v", e.id)
+}
+
+// create new tube consumer and producer
+func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) {
+ if pipe.String("tube", "") == "" {
+ return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline")
+ }
+
+ return &tube{
+ pipe: pipe,
+ tube: &beanstalk.Tube{Name: pipe.String("tube", "")},
+ tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")),
+ reserve: pipe.Duration("reserve", time.Second),
+ lsn: lsn,
+ }, nil
+}
+
+// run consumers
+func (t *tube) serve(connector connFactory) {
+ // tube specific consume connection
+ cn, err := connector.newConn()
+ if err != nil {
+ t.report(err)
+ return
+ }
+ defer cn.Close()
+
+ t.wait = make(chan interface{})
+ atomic.StoreInt32(&t.active, 1)
+
+ for {
+ e, err := t.consume(cn)
+ if err != nil {
+ if isConnError(err) {
+ t.report(err)
+ }
+ continue
+ }
+
+ if e == nil {
+ return
+ }
+
+ h := <-t.execPool
+ go func(h jobs.Handler, e *entry) {
+ err := t.do(cn, h, e)
+ t.execPool <- h
+ t.wg.Done()
+ t.report(err)
+ }(h, e)
+ }
+}
+
+// fetch consume
+func (t *tube) consume(cn *conn) (*entry, error) {
+ t.muw.Lock()
+ defer t.muw.Unlock()
+
+ select {
+ case <-t.wait:
+ return nil, nil
+ default:
+ conn, err := cn.acquire(false)
+ if err != nil {
+ return nil, err
+ }
+
+ t.tubeSet.Conn = conn
+
+ id, data, err := t.tubeSet.Reserve(t.reserve)
+ cn.release(err)
+
+ if err != nil {
+ return nil, err
+ }
+
+ t.wg.Add(1)
+ return &entry{id: id, data: data}, nil
+ }
+}
+
+// do data
+func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error {
+ j, err := unpack(e.data)
+ if err != nil {
+ return err
+ }
+
+ err = h(e.String(), j)
+
+ // mandatory acquisition
+ conn, connErr := cn.acquire(true)
+ if connErr != nil {
+ // possible if server is dead
+ return connErr
+ }
+
+ if err == nil {
+ return cn.release(conn.Delete(e.id))
+ }
+
+ stat, statErr := conn.StatsJob(e.id)
+ if statErr != nil {
+ return cn.release(statErr)
+ }
+
+ t.errHandler(e.String(), j, err)
+
+ reserves, ok := strconv.Atoi(stat["reserves"])
+ if ok != nil || !j.Options.CanRetry(reserves-1) {
+ return cn.release(conn.Bury(e.id, 0))
+ }
+
+ return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration()))
+}
+
+// stop tube consuming
+func (t *tube) stop() {
+ if atomic.LoadInt32(&t.active) == 0 {
+ return
+ }
+
+ atomic.StoreInt32(&t.active, 0)
+
+ close(t.wait)
+
+ t.muw.Lock()
+ t.wg.Wait()
+ t.muw.Unlock()
+}
+
+// put data into pool or return error (no wait), this method will try to reattempt operation if
+// dead conn found.
+func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
+ id, err = t.doPut(cn, attempt, data, delay, rrt)
+ if err != nil && isConnError(err) {
+ return t.doPut(cn, attempt, data, delay, rrt)
+ }
+
+ return id, err
+}
+
+// perform put operation
+func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
+ conn, err := cn.acquire(false)
+ if err != nil {
+ return "", err
+ }
+
+ var bid uint64
+
+ t.mut.Lock()
+ t.tube.Conn = conn
+ bid, err = t.tube.Put(data, 0, delay, rrt)
+ t.mut.Unlock()
+
+ return strconv.FormatUint(bid, 10), cn.release(err)
+}
+
+// return tube stats (retries)
+func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) {
+ stat, err = t.doStat(cn)
+ if err != nil && isConnError(err) {
+ return t.doStat(cn)
+ }
+
+ return stat, err
+}
+
+// return tube stats
+func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) {
+ conn, err := cn.acquire(false)
+ if err != nil {
+ return nil, err
+ }
+
+ t.mut.Lock()
+ t.tube.Conn = conn
+ values, err := t.tube.Stats()
+ t.mut.Unlock()
+
+ if err != nil {
+ return nil, cn.release(err)
+ }
+
+ stat = &jobs.Stat{InternalName: t.tube.Name}
+
+ if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil {
+ stat.Queue = int64(v)
+ }
+
+ if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil {
+ stat.Active = int64(v)
+ }
+
+ if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil {
+ stat.Delayed = int64(v)
+ }
+
+ return stat, cn.release(nil)
+}
+
+// report tube specific error
+func (t *tube) report(err error) {
+ if err != nil {
+ t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err})
+ }
+}
diff --git a/plugins/jobs/broker/beanstalk/tube_test.go b/plugins/jobs/broker/beanstalk/tube_test.go
new file mode 100644
index 00000000..b6a646f4
--- /dev/null
+++ b/plugins/jobs/broker/beanstalk/tube_test.go
@@ -0,0 +1,18 @@
+package beanstalk
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestTube_CantServe(t *testing.T) {
+ var gctx interface{}
+ tube := &tube{
+ lsn: func(event int, ctx interface{}) {
+ gctx = ctx
+ },
+ }
+
+ tube.serve(&Config{Addr: "broken"})
+ assert.Error(t, gctx.(error))
+}
diff --git a/plugins/jobs/broker/ephemeral/broker.go b/plugins/jobs/broker/ephemeral/broker.go
new file mode 100644
index 00000000..385bb175
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/broker.go
@@ -0,0 +1,174 @@
+package ephemeral
+
+import (
+ "fmt"
+ "github.com/gofrs/uuid"
+ "github.com/spiral/jobs/v2"
+ "sync"
+)
+
+// Broker run queue using local goroutines.
+type Broker struct {
+ lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ queues map[*jobs.Pipeline]*queue
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures broker.
+func (b *Broker) Init() (bool, error) {
+ b.queues = make(map[*jobs.Pipeline]*queue)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.queues[pipe]; ok {
+ return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
+ }
+
+ b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0))
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() error {
+ // start consuming
+ b.mu.Lock()
+ for _, q := range b.queues {
+ qq := q
+ if qq.execPool != nil {
+ go qq.serve()
+ }
+ }
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ // stop all consuming
+ for _, q := range b.queues {
+ q.stop()
+ }
+
+ close(b.wait)
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ q.stop()
+
+ q.execPool = execPool
+ q.errHandler = errHandler
+
+ if b.wait != nil {
+ if q.execPool != nil {
+ go q.serve()
+ }
+ }
+
+ return nil
+}
+
+// Push job into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ id, err := uuid.NewV4()
+ if err != nil {
+ return "", err
+ }
+
+ q.push(id.String(), j, 0, j.Options.DelayDuration())
+
+ return id.String(), nil
+}
+
+// Stat must consume statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ return q.stat(), nil
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return nil
+ }
+
+ return q
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
diff --git a/plugins/jobs/broker/ephemeral/broker_test.go b/plugins/jobs/broker/ephemeral/broker_test.go
new file mode 100644
index 00000000..c1b40276
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/broker_test.go
@@ -0,0 +1,221 @@
+package ephemeral
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+var (
+ pipe = &jobs.Pipeline{
+ "broker": "local",
+ "name": "default",
+ }
+)
+
+func TestBroker_Init(t *testing.T) {
+ b := &Broker{}
+ ok, err := b.Init()
+ assert.True(t, ok)
+ assert.NoError(t, err)
+}
+
+func TestBroker_StopNotStarted(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ b.Stop()
+}
+
+func TestBroker_Register(t *testing.T) {
+ b := &Broker{}
+ b.Init()
+ assert.NoError(t, b.Register(pipe))
+}
+
+func TestBroker_Register_Twice(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+ assert.Error(t, b.Register(pipe))
+}
+
+func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_Undefined(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ assert.NoError(t, b.Consume(pipe, exec, errf))
+}
+
+func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = b.Consume(pipe, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_Serve_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ err = b.Consume(pipe, exec, errf)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_PushToNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_PushToNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/ephemeral/consume_test.go b/plugins/jobs/broker/ephemeral/consume_test.go
new file mode 100644
index 00000000..d764a984
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/consume_test.go
@@ -0,0 +1,253 @@
+package ephemeral
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Consume_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_Delayed(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Delay: 1},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ start := time.Now()
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ elapsed := time.Since(start)
+ assert.True(t, elapsed > time.Second)
+ assert.True(t, elapsed < 2*time.Second)
+}
+
+func TestBroker_Consume_Errored(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ close(errHandled)
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return fmt.Errorf("job failed")
+ }
+
+ <-waitJob
+ <-errHandled
+}
+
+func TestBroker_Consume_Errored_Attempts(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ attempts := 0
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ attempts++
+ errHandled <- nil
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Attempts: 3},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ return fmt.Errorf("job failed")
+ }
+
+ <-errHandled
+ <-errHandled
+ <-errHandled
+ assert.Equal(t, 3, attempts)
+}
diff --git a/plugins/jobs/broker/ephemeral/queue.go b/plugins/jobs/broker/ephemeral/queue.go
new file mode 100644
index 00000000..a24bc216
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/queue.go
@@ -0,0 +1,161 @@
+package ephemeral
+
+import (
+ "github.com/spiral/jobs/v2"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type queue struct {
+ on int32
+ state *jobs.Stat
+
+ // job pipeline
+ concurPool chan interface{}
+ jobs chan *entry
+
+ // on operations
+ muw sync.Mutex
+ wg sync.WaitGroup
+
+ // stop channel
+ wait chan interface{}
+
+ // exec handlers
+ execPool chan jobs.Handler
+ errHandler jobs.ErrorHandler
+}
+
+type entry struct {
+ id string
+ job *jobs.Job
+ attempt int
+}
+
+// create new queue
+func newQueue(maxConcur int) *queue {
+ q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)}
+
+ if maxConcur != 0 {
+ q.concurPool = make(chan interface{}, maxConcur)
+ for i := 0; i < maxConcur; i++ {
+ q.concurPool <- nil
+ }
+ }
+
+ return q
+}
+
+// serve consumers
+func (q *queue) serve() {
+ q.wait = make(chan interface{})
+ atomic.StoreInt32(&q.on, 1)
+
+ for {
+ e := q.consume()
+ if e == nil {
+ q.wg.Wait()
+ return
+ }
+
+ if q.concurPool != nil {
+ <-q.concurPool
+ }
+
+ atomic.AddInt64(&q.state.Active, 1)
+ h := <-q.execPool
+
+ go func(h jobs.Handler, e *entry) {
+ defer q.wg.Done()
+
+ q.do(h, e)
+ atomic.AddInt64(&q.state.Active, ^int64(0))
+
+ q.execPool <- h
+
+ if q.concurPool != nil {
+ q.concurPool <- nil
+ }
+ }(h, e)
+ }
+}
+
+// allocate one job entry
+func (q *queue) consume() *entry {
+ q.muw.Lock()
+ defer q.muw.Unlock()
+
+ select {
+ case <-q.wait:
+ return nil
+ case e := <-q.jobs:
+ q.wg.Add(1)
+
+ return e
+ }
+}
+
+// do singe job
+func (q *queue) do(h jobs.Handler, e *entry) {
+ err := h(e.id, e.job)
+
+ if err == nil {
+ atomic.AddInt64(&q.state.Queue, ^int64(0))
+ return
+ }
+
+ q.errHandler(e.id, e.job, err)
+
+ if !e.job.Options.CanRetry(e.attempt) {
+ atomic.AddInt64(&q.state.Queue, ^int64(0))
+ return
+ }
+
+ q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
+}
+
+// stop the queue consuming
+func (q *queue) stop() {
+ if atomic.LoadInt32(&q.on) == 0 {
+ return
+ }
+
+ close(q.wait)
+
+ q.muw.Lock()
+ q.wg.Wait()
+ q.muw.Unlock()
+
+ atomic.StoreInt32(&q.on, 0)
+}
+
+// add job to the queue
+func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
+ if delay == 0 {
+ atomic.AddInt64(&q.state.Queue, 1)
+ go func() {
+ q.jobs <- &entry{id: id, job: j, attempt: attempt}
+ }()
+
+ return
+ }
+
+ atomic.AddInt64(&q.state.Delayed, 1)
+ go func() {
+ time.Sleep(delay)
+ atomic.AddInt64(&q.state.Delayed, ^int64(0))
+ atomic.AddInt64(&q.state.Queue, 1)
+
+ q.jobs <- &entry{id: id, job: j, attempt: attempt}
+ }()
+}
+
+func (q *queue) stat() *jobs.Stat {
+ return &jobs.Stat{
+ InternalName: ":memory:",
+ Queue: atomic.LoadInt64(&q.state.Queue),
+ Active: atomic.LoadInt64(&q.state.Active),
+ Delayed: atomic.LoadInt64(&q.state.Delayed),
+ }
+}
diff --git a/plugins/jobs/broker/ephemeral/stat_test.go b/plugins/jobs/broker/ephemeral/stat_test.go
new file mode 100644
index 00000000..0894323c
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/stat_test.go
@@ -0,0 +1,64 @@
+package ephemeral
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestBroker_Stat(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Active)
+
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+ stat, err = b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+}
diff --git a/plugins/jobs/broker/sqs/broker.go b/plugins/jobs/broker/sqs/broker.go
new file mode 100644
index 00000000..8cc62b6b
--- /dev/null
+++ b/plugins/jobs/broker/sqs/broker.go
@@ -0,0 +1,189 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/jobs/v2"
+ "sync"
+)
+
+// Broker represents SQS broker.
+type Broker struct {
+ cfg *Config
+ sqs *sqs.SQS
+ lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ queues map[*jobs.Pipeline]*queue
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures SQS broker.
+func (b *Broker) Init(cfg *Config) (ok bool, err error) {
+ b.cfg = cfg
+ b.queues = make(map[*jobs.Pipeline]*queue)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.queues[pipe]; ok {
+ return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
+ }
+
+ q, err := newQueue(pipe, b.throw)
+ if err != nil {
+ return err
+ }
+
+ b.queues[pipe] = q
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() (err error) {
+ b.mu.Lock()
+
+ b.sqs, err = b.cfg.SQS()
+ if err != nil {
+ return err
+ }
+
+ for _, q := range b.queues {
+ q.url, err = q.declareQueue(b.sqs)
+ if err != nil {
+ return err
+ }
+ }
+
+ for _, q := range b.queues {
+ qq := q
+ if qq.execPool != nil {
+ go qq.serve(b.sqs, b.cfg.TimeoutDuration())
+ }
+ }
+
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ for _, q := range b.queues {
+ q.stop()
+ }
+
+ b.wait <- nil
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ q.stop()
+
+ q.execPool = execPool
+ q.errHandler = errHandler
+
+ if b.sqs != nil && q.execPool != nil {
+ go q.serve(b.sqs, b.cfg.TimeoutDuration())
+ }
+
+ return nil
+}
+
+// Push job into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ if j.Options.Delay > 900 || j.Options.RetryDelay > 900 {
+ return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name())
+ }
+
+ return q.send(b.sqs, j)
+}
+
+// Stat must fetch statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ return q.stat(b.sqs)
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return nil
+ }
+
+ return q
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
diff --git a/plugins/jobs/broker/sqs/broker_test.go b/plugins/jobs/broker/sqs/broker_test.go
new file mode 100644
index 00000000..c87b302d
--- /dev/null
+++ b/plugins/jobs/broker/sqs/broker_test.go
@@ -0,0 +1,275 @@
+package sqs
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+var (
+ pipe = &jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ "queue": "test",
+ "declare": map[string]interface{}{
+ "MessageRetentionPeriod": 86400,
+ },
+ }
+
+ cfg = &Config{
+ Key: "api-key",
+ Secret: "api-secret",
+ Region: "us-west-1",
+ Endpoint: "http://localhost:9324",
+ }
+)
+
+func TestBroker_Init(t *testing.T) {
+ b := &Broker{}
+ ok, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.True(t, ok)
+ assert.NoError(t, err)
+}
+
+func TestBroker_StopNotStarted(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ b.Stop()
+}
+
+func TestBroker_Register(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+}
+
+func TestBroker_RegisterInvalid(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Error(t, b.Register(&jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ }))
+}
+
+func TestBroker_Register_Twice(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+ assert.Error(t, b.Register(pipe))
+}
+
+func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_Undefined(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ assert.NoError(t, b.Consume(pipe, exec, errf))
+}
+
+func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = b.Consume(pipe, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_Serve_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ b.Consume(pipe, exec, errf)
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) {
+ pipe := &jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ "queue": "invalid",
+ "declare": map[string]interface{}{
+ "VisibilityTimeout": "invalid",
+ },
+ }
+
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ b.Consume(pipe, exec, errf)
+
+ assert.Error(t, b.Serve())
+}
+
+func TestBroker_PushToNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_PushToNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/sqs/config.go b/plugins/jobs/broker/sqs/config.go
new file mode 100644
index 00000000..d0c2f2b2
--- /dev/null
+++ b/plugins/jobs/broker/sqs/config.go
@@ -0,0 +1,82 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Config defines sqs broker configuration.
+type Config struct {
+ // Region defined SQS region, not required when endpoint is not empty.
+ Region string
+
+ // Region defined AWS API key, not required when endpoint is not empty.
+ Key string
+
+ // Region defined AWS API secret, not required when endpoint is not empty.
+ Secret string
+
+ // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development.
+ Endpoint string
+
+ // Timeout to allocate the connection. Default 10 seconds.
+ Timeout int
+}
+
+// Hydrate config values.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ if c.Region == "" {
+ return fmt.Errorf("SQS region is missing")
+ }
+
+ if c.Key == "" {
+ return fmt.Errorf("SQS key is missing")
+ }
+
+ if c.Secret == "" {
+ return fmt.Errorf("SQS secret is missing")
+ }
+
+ return nil
+}
+
+// TimeoutDuration returns number of seconds allowed to allocate the connection.
+func (c *Config) TimeoutDuration() time.Duration {
+ timeout := c.Timeout
+ if timeout == 0 {
+ timeout = 10
+ }
+
+ return time.Duration(timeout) * time.Second
+}
+
+// Session returns new AWS session.
+func (c *Config) Session() (*session.Session, error) {
+ return session.NewSession(&aws.Config{
+ Region: aws.String(c.Region),
+ Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""),
+ })
+}
+
+// SQS returns new SQS instance or error.
+func (c *Config) SQS() (*sqs.SQS, error) {
+ sess, err := c.Session()
+ if err != nil {
+ return nil, err
+ }
+
+ if c.Endpoint == "" {
+ return sqs.New(sess), nil
+ }
+
+ return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil
+}
diff --git a/plugins/jobs/broker/sqs/config_test.go b/plugins/jobs/broker/sqs/config_test.go
new file mode 100644
index 00000000..b36b3c6f
--- /dev/null
+++ b/plugins/jobs/broker/sqs/config_test.go
@@ -0,0 +1,48 @@
+package sqs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{`{"dead`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error3(t *testing.T) {
+ cfg := &mockCfg{`{"region":"us-east-1"}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error4(t *testing.T) {
+ cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error5(t *testing.T) {
+ cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
diff --git a/plugins/jobs/broker/sqs/consume_test.go b/plugins/jobs/broker/sqs/consume_test.go
new file mode 100644
index 00000000..434fc6ea
--- /dev/null
+++ b/plugins/jobs/broker/sqs/consume_test.go
@@ -0,0 +1,370 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Consume_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) {
+ pipe := &jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ "queue": "test",
+ }
+
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_PushTooBigDelay(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{
+ Delay: 901,
+ },
+ })
+
+ assert.Error(t, perr)
+}
+
+func TestBroker_Consume_PushTooBigDelay2(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{
+ RetryDelay: 901,
+ },
+ })
+
+ assert.Error(t, perr)
+}
+
+func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_Delayed(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Delay: 1},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ start := time.Now()
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ elapsed := time.Since(start)
+ assert.True(t, elapsed > time.Second)
+ assert.True(t, elapsed < 2*time.Second)
+}
+
+func TestBroker_Consume_Errored(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ close(errHandled)
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return fmt.Errorf("job failed")
+ }
+
+ <-waitJob
+ <-errHandled
+ b.Stop()
+}
+
+func TestBroker_Consume_Errored_Attempts(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ attempts := 0
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ attempts++
+ errHandled <- nil
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Attempts: 3},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ return fmt.Errorf("job failed")
+ }
+
+ <-errHandled
+ <-errHandled
+ <-errHandled
+ assert.Equal(t, 3, attempts)
+}
diff --git a/plugins/jobs/broker/sqs/durability_test.go b/plugins/jobs/broker/sqs/durability_test.go
new file mode 100644
index 00000000..58ddf4b9
--- /dev/null
+++ b/plugins/jobs/broker/sqs/durability_test.go
@@ -0,0 +1,588 @@
+package sqs
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "net"
+ "sync"
+ "testing"
+ "time"
+)
+
+var (
+ proxyCfg = &Config{
+ Key: "api-key",
+ Secret: "api-secret",
+ Region: "us-west-1",
+ Endpoint: "http://localhost:9325",
+ Timeout: 1,
+ }
+
+ proxy = &tcpProxy{
+ listen: "localhost:9325",
+ upstream: "localhost:9324",
+ accept: true,
+ }
+
+ proxyPipe = &jobs.Pipeline{
+ "broker": "sqs",
+ "name": "default",
+ "queue": "test",
+ "lockReserved": 1,
+ "declare": map[string]interface{}{
+ "MessageRetentionPeriod": 86400,
+ },
+ }
+)
+
+type tcpProxy struct {
+ listen string
+ upstream string
+ mu sync.Mutex
+ accept bool
+ conn []net.Conn
+}
+
+func (p *tcpProxy) serve() {
+ l, err := net.Listen("tcp", p.listen)
+ if err != nil {
+ panic(err)
+ }
+
+ for {
+ in, err := l.Accept()
+ if err != nil {
+ panic(err)
+ }
+
+ if !p.accepting() {
+ in.Close()
+ }
+
+ up, err := net.Dial("tcp", p.upstream)
+ if err != nil {
+ panic(err)
+ }
+
+ go io.Copy(in, up)
+ go io.Copy(up, in)
+
+ p.mu.Lock()
+ p.conn = append(p.conn, in, up)
+ p.mu.Unlock()
+ }
+}
+
+// wait for specific number of connections
+func (p *tcpProxy) waitConn(count int) *tcpProxy {
+ p.mu.Lock()
+ p.accept = true
+ p.mu.Unlock()
+
+ for {
+ p.mu.Lock()
+ current := len(p.conn)
+ p.mu.Unlock()
+
+ if current >= count*2 {
+ break
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+
+ return p
+}
+
+func (p *tcpProxy) reset(accept bool) int {
+ p.mu.Lock()
+ p.accept = accept
+ defer p.mu.Unlock()
+
+ count := 0
+ for _, conn := range p.conn {
+ conn.Close()
+ count++
+ }
+
+ p.conn = nil
+ return count / 2
+}
+
+func (p *tcpProxy) accepting() bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.accept
+}
+
+func init() {
+ go proxy.serve()
+}
+
+func TestBroker_Durability_Base(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ // expect 2 connections
+ proxy.waitConn(1)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Durability_Consume(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(1)
+
+ jid, perr = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ time.Sleep(3 * time.Second)
+ proxy.waitConn(1)
+
+ jid, perr = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume2(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.Error(t, perr)
+
+ // restore
+ proxy.waitConn(2)
+
+ jid, perr = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ st, serr := b.Stat(proxyPipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(1), st.Queue+st.Active)
+
+ proxy.reset(false)
+
+ _, serr = b.Stat(proxyPipe)
+ assert.Error(t, serr)
+
+ proxy.reset(true)
+
+ _, serr = b.Stat(proxyPipe)
+ assert.NoError(t, serr)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume3(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1)
+
+ jid, perr := b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ st, serr := b.Stat(proxyPipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(1), st.Queue+st.Active)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 1 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_Consume4(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ proxy.waitConn(1)
+
+ _, err = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "kill",
+ Options: &jobs.Options{Timeout: 2},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(proxyPipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Timeout: 2},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ st, serr := b.Stat(proxyPipe)
+ assert.NoError(t, serr)
+ assert.Equal(t, int64(3), st.Queue+st.Active)
+
+ mu := sync.Mutex{}
+ done := make(map[string]bool)
+ exec <- func(id string, j *jobs.Job) error {
+ mu.Lock()
+ defer mu.Unlock()
+ done[id] = true
+
+ if j.Payload == "kill" {
+ proxy.reset(true)
+ }
+
+ return nil
+ }
+
+ for {
+ mu.Lock()
+ num := len(done)
+ mu.Unlock()
+
+ if num >= 3 {
+ break
+ }
+ }
+}
+
+func TestBroker_Durability_StopDead(t *testing.T) {
+ defer proxy.reset(true)
+
+ b := &Broker{}
+ _, err := b.Init(proxyCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(proxyPipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+
+ <-ready
+
+ proxy.waitConn(1).reset(false)
+
+ b.Stop()
+}
diff --git a/plugins/jobs/broker/sqs/job.go b/plugins/jobs/broker/sqs/job.go
new file mode 100644
index 00000000..50e2c164
--- /dev/null
+++ b/plugins/jobs/broker/sqs/job.go
@@ -0,0 +1,80 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/jobs/v2"
+ "strconv"
+ "time"
+)
+
+var jobAttributes = []*string{
+ aws.String("rr-job"),
+ aws.String("rr-maxAttempts"),
+ aws.String("rr-delay"),
+ aws.String("rr-timeout"),
+ aws.String("rr-retryDelay"),
+}
+
+// pack job metadata into headers
+func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
+ return &sqs.SendMessageInput{
+ QueueUrl: url,
+ DelaySeconds: aws.Int64(int64(j.Options.Delay)),
+ MessageBody: aws.String(j.Payload),
+ MessageAttributes: map[string]*sqs.MessageAttributeValue{
+ "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)},
+ "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)},
+ "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())},
+ "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())},
+ "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
+ },
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
+ if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok {
+ return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount")
+ }
+ attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])
+
+ for _, attr := range jobAttributes {
+ if _, ok := msg.MessageAttributes[*attr]; !ok {
+ return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
+ }
+ }
+
+ j = &jobs.Job{
+ Job: *msg.MessageAttributes["rr-job"].StringValue,
+ Payload: *msg.Body,
+ Options: &jobs.Options{},
+ }
+
+ if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil {
+ j.Options.Delay = delay
+ }
+
+ if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil {
+ j.Options.Attempts = maxAttempts
+ }
+
+ if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil {
+ j.Options.Timeout = timeout
+ }
+
+ if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil {
+ j.Options.RetryDelay = retryDelay
+ }
+
+ return *msg.MessageId, attempt - 1, j, nil
+}
+
+func awsString(n int) *string {
+ return aws.String(strconv.Itoa(n))
+}
+
+func awsDuration(d time.Duration) *string {
+ return aws.String(strconv.Itoa(int(d.Seconds())))
+}
diff --git a/plugins/jobs/broker/sqs/job_test.go b/plugins/jobs/broker/sqs/job_test.go
new file mode 100644
index 00000000..a120af53
--- /dev/null
+++ b/plugins/jobs/broker/sqs/job_test.go
@@ -0,0 +1,19 @@
+package sqs
+
+import (
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_Unpack(t *testing.T) {
+ msg := &sqs.Message{
+ Body: aws.String("body"),
+ Attributes: map[string]*string{},
+ MessageAttributes: map[string]*sqs.MessageAttributeValue{},
+ }
+
+ _, _, _, err := unpack(msg)
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/sqs/queue.go b/plugins/jobs/broker/sqs/queue.go
new file mode 100644
index 00000000..8a92448e
--- /dev/null
+++ b/plugins/jobs/broker/sqs/queue.go
@@ -0,0 +1,266 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/jobs/v2"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type queue struct {
+ active int32
+ pipe *jobs.Pipeline
+ url *string
+ reserve time.Duration
+ lockReserved time.Duration
+
+ // queue events
+ lsn func(event int, ctx interface{})
+
+ // stop channel
+ wait chan interface{}
+
+ // active operations
+ muw sync.RWMutex
+ wg sync.WaitGroup
+
+ // exec handlers
+ execPool chan jobs.Handler
+ errHandler jobs.ErrorHandler
+}
+
+func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
+ if pipe.String("queue", "") == "" {
+ return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name())
+ }
+
+ return &queue{
+ pipe: pipe,
+ reserve: pipe.Duration("reserve", time.Second),
+ lockReserved: pipe.Duration("lockReserved", 300*time.Second),
+ lsn: lsn,
+ }, nil
+}
+
+// declareQueue declared queue
+func (q *queue) declareQueue(s *sqs.SQS) (*string, error) {
+ attr := make(map[string]*string)
+ for k, v := range q.pipe.Map("declare") {
+ if vs, ok := v.(string); ok {
+ attr[k] = aws.String(vs)
+ }
+
+ if vi, ok := v.(int); ok {
+ attr[k] = aws.String(strconv.Itoa(vi))
+ }
+
+ if vb, ok := v.(bool); ok {
+ if vb {
+ attr[k] = aws.String("true")
+ } else {
+ attr[k] = aws.String("false")
+ }
+ }
+ }
+
+ if len(attr) != 0 {
+ r, err := s.CreateQueue(&sqs.CreateQueueInput{
+ QueueName: aws.String(q.pipe.String("queue", "")),
+ Attributes: attr,
+ })
+
+ return r.QueueUrl, err
+ }
+
+ // no need to create (get existed)
+ r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))})
+ if err != nil {
+ return nil, err
+ }
+
+ return r.QueueUrl, nil
+}
+
+// serve consumers
+func (q *queue) serve(s *sqs.SQS, tout time.Duration) {
+ q.wait = make(chan interface{})
+ atomic.StoreInt32(&q.active, 1)
+
+ var errored bool
+ for {
+ messages, stop, err := q.consume(s)
+ if err != nil {
+ if errored {
+ // reoccurring error
+ time.Sleep(tout)
+ } else {
+ errored = true
+ q.report(err)
+ }
+
+ continue
+ }
+ errored = false
+
+ if stop {
+ return
+ }
+
+ for _, msg := range messages {
+ h := <-q.execPool
+ go func(h jobs.Handler, msg *sqs.Message) {
+ err := q.do(s, h, msg)
+ q.execPool <- h
+ q.wg.Done()
+ q.report(err)
+ }(h, msg)
+ }
+ }
+}
+
+// consume and allocate connection.
+func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) {
+ q.muw.Lock()
+ defer q.muw.Unlock()
+
+ select {
+ case <-q.wait:
+ return nil, true, nil
+ default:
+ r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{
+ QueueUrl: q.url,
+ MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))),
+ WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())),
+ VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())),
+ AttributeNames: []*string{aws.String("ApproximateReceiveCount")},
+ MessageAttributeNames: jobAttributes,
+ })
+ if err != nil {
+ return nil, false, err
+ }
+
+ q.wg.Add(len(r.Messages))
+
+ return r.Messages, false, nil
+ }
+}
+
+// do single message
+func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) {
+ id, attempt, j, err := unpack(msg)
+ if err != nil {
+ go q.deleteMessage(s, msg, err)
+ return err
+ }
+
+ // block the job based on known timeout
+ _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ QueueUrl: q.url,
+ ReceiptHandle: msg.ReceiptHandle,
+ VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())),
+ })
+ if err != nil {
+ go q.deleteMessage(s, msg, err)
+ return err
+ }
+
+ err = h(id, j)
+ if err == nil {
+ return q.deleteMessage(s, msg, nil)
+ }
+
+ q.errHandler(id, j, err)
+
+ if !j.Options.CanRetry(attempt) {
+ return q.deleteMessage(s, msg, err)
+ }
+
+ // retry after specified duration
+ _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
+ QueueUrl: q.url,
+ ReceiptHandle: msg.ReceiptHandle,
+ VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)),
+ })
+
+ return err
+}
+
+func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error {
+ _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle})
+ return drr
+}
+
+// stop the queue consuming
+func (q *queue) stop() {
+ if atomic.LoadInt32(&q.active) == 0 {
+ return
+ }
+
+ atomic.StoreInt32(&q.active, 0)
+
+ close(q.wait)
+ q.muw.Lock()
+ q.wg.Wait()
+ q.muw.Unlock()
+}
+
+// add job to the queue
+func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) {
+ r, err := s.SendMessage(pack(q.url, j))
+ if err != nil {
+ return "", err
+ }
+
+ return *r.MessageId, nil
+}
+
+// return queue stats
+func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) {
+ r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{
+ QueueUrl: q.url,
+ AttributeNames: []*string{
+ aws.String("ApproximateNumberOfMessages"),
+ aws.String("ApproximateNumberOfMessagesDelayed"),
+ aws.String("ApproximateNumberOfMessagesNotVisible"),
+ },
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")}
+
+ for a, v := range r.Attributes {
+ if a == "ApproximateNumberOfMessages" {
+ if v, err := strconv.Atoi(*v); err == nil {
+ stat.Queue = int64(v)
+ }
+ }
+
+ if a == "ApproximateNumberOfMessagesNotVisible" {
+ if v, err := strconv.Atoi(*v); err == nil {
+ stat.Active = int64(v)
+ }
+ }
+
+ if a == "ApproximateNumberOfMessagesDelayed" {
+ if v, err := strconv.Atoi(*v); err == nil {
+ stat.Delayed = int64(v)
+ }
+ }
+ }
+
+ return stat, nil
+}
+
+// throw handles service, server and pool events.
+func (q *queue) report(err error) {
+ if err != nil {
+ q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
+ }
+}
diff --git a/plugins/jobs/broker/sqs/stat_test.go b/plugins/jobs/broker/sqs/stat_test.go
new file mode 100644
index 00000000..5031571b
--- /dev/null
+++ b/plugins/jobs/broker/sqs/stat_test.go
@@ -0,0 +1,60 @@
+package sqs
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestBroker_Stat(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ // unable to use approximated stats
+ _, err = b.Stat(pipe)
+ assert.NoError(t, err)
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ _, err := b.Stat(pipe)
+ assert.NoError(t, err)
+
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+ _, err = b.Stat(pipe)
+ assert.NoError(t, err)
+}
diff --git a/plugins/jobs/broker_test.go b/plugins/jobs/broker_test.go
new file mode 100644
index 00000000..9625e24b
--- /dev/null
+++ b/plugins/jobs/broker_test.go
@@ -0,0 +1,314 @@
+package jobs
+
+import (
+ "fmt"
+ "github.com/gofrs/uuid"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// testBroker run testQueue using local goroutines.
+type testBroker struct {
+ lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ queues map[*Pipeline]*testQueue
+}
+
+// Listen attaches server event watcher.
+func (b *testBroker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures broker.
+func (b *testBroker) Init() (bool, error) {
+ b.queues = make(map[*Pipeline]*testQueue)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *testBroker) Register(pipe *Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.queues[pipe]; ok {
+ return fmt.Errorf("testQueue `%s` has already been registered", pipe.Name())
+ }
+
+ b.queues[pipe] = newQueue()
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *testBroker) Serve() error {
+ // start pipelines
+ b.mu.Lock()
+ for _, q := range b.queues {
+ qq := q
+ if qq.execPool != nil {
+ go qq.serve()
+ }
+ }
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *testBroker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ // stop all pipelines
+ for _, q := range b.queues {
+ q.stop()
+ }
+
+ close(b.wait)
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
+// the service is started!
+func (b *testBroker) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return fmt.Errorf("undefined testQueue `%s`", pipe.Name())
+ }
+
+ q.stop()
+
+ q.execPool = execPool
+ q.errHandler = errHandler
+
+ if b.wait != nil {
+ if q.execPool != nil {
+ go q.serve()
+ }
+ }
+
+ return nil
+}
+
+// Push job into the worker.
+func (b *testBroker) Push(pipe *Pipeline, j *Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return "", fmt.Errorf("undefined testQueue `%s`", pipe.Name())
+ }
+
+ id, err := uuid.NewV4()
+ if err != nil {
+ return "", err
+ }
+
+ q.push(id.String(), j, 0, j.Options.DelayDuration())
+
+ return id.String(), nil
+}
+
+// Stat must consume statistics about given pipeline or return error.
+func (b *testBroker) Stat(pipe *Pipeline) (stat *Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return nil, fmt.Errorf("undefined testQueue `%s`", pipe.Name())
+ }
+
+ return q.stat(), nil
+}
+
+// check if broker is serving
+func (b *testBroker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// testQueue returns testQueue associated with the pipeline.
+func (b *testBroker) queue(pipe *Pipeline) *testQueue {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return nil
+ }
+
+ return q
+}
+
+// throw handles service, server and pool events.
+func (b *testBroker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
+
+type testQueue struct {
+ active int32
+ st *Stat
+
+ // job pipeline
+ jobs chan *entry
+
+ // pipelines operations
+ muw sync.Mutex
+ wg sync.WaitGroup
+
+ // stop channel
+ wait chan interface{}
+
+ // exec handlers
+ execPool chan Handler
+ errHandler ErrorHandler
+}
+
+type entry struct {
+ id string
+ job *Job
+ attempt int
+}
+
+// create new testQueue
+func newQueue() *testQueue {
+ return &testQueue{st: &Stat{}, jobs: make(chan *entry)}
+}
+
+// todo NOT USED
+// associate testQueue with new do pool
+//func (q *testQueue) configure(execPool chan Handler, err ErrorHandler) error {
+// q.execPool = execPool
+// q.errHandler = err
+//
+// return nil
+//}
+
+// serve consumers
+func (q *testQueue) serve() {
+ q.wait = make(chan interface{})
+ atomic.StoreInt32(&q.active, 1)
+
+ for {
+ e := q.consume()
+ if e == nil {
+ return
+ }
+
+ atomic.AddInt64(&q.st.Active, 1)
+ h := <-q.execPool
+ go func(e *entry) {
+ q.do(h, e)
+ atomic.AddInt64(&q.st.Active, ^int64(0))
+ q.execPool <- h
+ q.wg.Done()
+ }(e)
+ }
+}
+
+// allocate one job entry
+func (q *testQueue) consume() *entry {
+ q.muw.Lock()
+ defer q.muw.Unlock()
+
+ select {
+ case <-q.wait:
+ return nil
+ case e := <-q.jobs:
+ q.wg.Add(1)
+
+ return e
+ }
+}
+
+// do singe job
+func (q *testQueue) do(h Handler, e *entry) {
+ err := h(e.id, e.job)
+
+ if err == nil {
+ atomic.AddInt64(&q.st.Queue, ^int64(0))
+ return
+ }
+
+ q.errHandler(e.id, e.job, err)
+
+ if !e.job.Options.CanRetry(e.attempt) {
+ atomic.AddInt64(&q.st.Queue, ^int64(0))
+ return
+ }
+
+ q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
+}
+
+// stop the testQueue pipelines
+func (q *testQueue) stop() {
+ if atomic.LoadInt32(&q.active) == 0 {
+ return
+ }
+
+ atomic.StoreInt32(&q.active, 0)
+
+ close(q.wait)
+ q.muw.Lock()
+ q.wg.Wait()
+ q.muw.Unlock()
+}
+
+// add job to the testQueue
+func (q *testQueue) push(id string, j *Job, attempt int, delay time.Duration) {
+ if delay == 0 {
+ atomic.AddInt64(&q.st.Queue, 1)
+ go func() {
+ q.jobs <- &entry{id: id, job: j, attempt: attempt}
+ }()
+
+ return
+ }
+
+ atomic.AddInt64(&q.st.Delayed, 1)
+ go func() {
+ time.Sleep(delay)
+ atomic.AddInt64(&q.st.Delayed, ^int64(0))
+ atomic.AddInt64(&q.st.Queue, 1)
+
+ q.jobs <- &entry{id: id, job: j, attempt: attempt}
+ }()
+}
+
+func (q *testQueue) stat() *Stat {
+ return &Stat{
+ InternalName: ":memory:",
+ Queue: atomic.LoadInt64(&q.st.Queue),
+ Active: atomic.LoadInt64(&q.st.Active),
+ Delayed: atomic.LoadInt64(&q.st.Delayed),
+ }
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
new file mode 100644
index 00000000..674bf468
--- /dev/null
+++ b/plugins/jobs/config.go
@@ -0,0 +1,91 @@
+package jobs
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+)
+
+// Config defines settings for job broker, workers and job-pipeline mapping.
+type Config struct {
+ // Workers configures roadrunner server and worker busy.
+ Workers *roadrunner.ServerConfig
+
+ // Dispatch defines where and how to match jobs.
+ Dispatch map[string]*Options
+
+ // Pipelines defines mapping between PHP job pipeline and associated job broker.
+ Pipelines map[string]*Pipeline
+
+ // Consuming specifies names of pipelines to be consumed on service start.
+ Consume []string
+
+ // parent config for broken options.
+ parent service.Config
+ pipelines Pipelines
+ route Dispatcher
+}
+
+// Hydrate populates config values.
+func (c *Config) Hydrate(cfg service.Config) (err error) {
+ c.Workers = &roadrunner.ServerConfig{}
+ c.Workers.InitDefaults()
+
+ if err := cfg.Unmarshal(&c); err != nil {
+ return err
+ }
+
+ c.pipelines, err = initPipelines(c.Pipelines)
+ if err != nil {
+ return err
+ }
+
+ if c.Workers.Command != "" {
+ if err := c.Workers.Pool.Valid(); err != nil {
+ return c.Workers.Pool.Valid()
+ }
+ }
+
+ c.parent = cfg
+ c.route = initDispatcher(c.Dispatch)
+
+ return nil
+}
+
+// MatchPipeline locates the pipeline associated with the job.
+func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) {
+ opt := c.route.match(job)
+
+ pipe := ""
+ if job.Options != nil {
+ pipe = job.Options.Pipeline
+ }
+
+ if pipe == "" && opt != nil {
+ pipe = opt.Pipeline
+ }
+
+ if pipe == "" {
+ return nil, nil, fmt.Errorf("unable to locate pipeline for `%s`", job.Job)
+ }
+
+ if p := c.pipelines.Get(pipe); p != nil {
+ return p, opt, nil
+ }
+
+ return nil, nil, fmt.Errorf("undefined pipeline `%s`", pipe)
+}
+
+// Get underlying broker config.
+func (c *Config) Get(service string) service.Config {
+ if c.parent == nil {
+ return nil
+ }
+
+ return c.parent.Get(service)
+}
+
+// Unmarshal is doing nothing.
+func (c *Config) Unmarshal(out interface{}) error {
+ return nil
+}
diff --git a/plugins/jobs/config_test.go b/plugins/jobs/config_test.go
new file mode 100644
index 00000000..c55a5c5f
--- /dev/null
+++ b/plugins/jobs/config_test.go
@@ -0,0 +1,158 @@
+package jobs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config {
+ if name == "same" || name == "jobs" {
+ return cfg
+ }
+
+ return nil
+}
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{cfg: `{"dead`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_OK(t *testing.T) {
+ cfg := &mockCfg{cfg: `{
+ "workers":{"pool":{"numWorkers": 1}}
+}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Unmarshal(t *testing.T) {
+ cfg := &mockCfg{cfg: `{
+ "workers":{"pool":{"numWorkers": 1}}
+}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ var i interface{}
+ assert.Nil(t, c.Unmarshal(i))
+}
+
+func Test_Config_Hydrate_Get(t *testing.T) {
+ cfg := &mockCfg{cfg: `{
+ "workers":{"pool":{"numWorkers": 1}}
+}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ assert.Nil(t, c.Get("nil"))
+}
+
+func Test_Config_Hydrate_Get_Valid(t *testing.T) {
+ cfg := &mockCfg{cfg: `{
+ "workers":{"pool":{"numWorkers": 1}}
+}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ assert.Equal(t, cfg, c.Get("same"))
+}
+
+func Test_Config_Hydrate_GetNoParent(t *testing.T) {
+ c := &Config{}
+ assert.Nil(t, c.Get("nil"))
+}
+
+func Test_Pipelines(t *testing.T) {
+ cfg := &mockCfg{cfg: `{
+ "workers":{
+ "pool":{"numWorkers": 1}
+ },
+ "pipelines":{
+ "pipe": {"broker":"broker"}
+ },
+ "dispatch":{
+ "job.*": {"pipeline":"default"}
+ }
+ }`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ assert.Equal(t, "pipe", c.pipelines.Get("pipe").Name())
+ assert.Equal(t, "broker", c.pipelines.Get("pipe").Broker())
+}
+
+func Test_Pipelines_NoBroker(t *testing.T) {
+ cfg := &mockCfg{cfg: `{
+ "workers":{
+ "pool":{"numWorkers": 1}
+ },
+ "pipelines":{
+ "pipe": {}
+ },
+ "dispatch":{
+ "job.*": {"pipeline":"default"}
+ }
+ }`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_MatchPipeline(t *testing.T) {
+ cfg := &mockCfg{cfg: `{
+ "workers":{
+ "pool":{"numWorkers": 1}
+ },
+ "pipelines":{
+ "pipe": {"broker":"default"}
+ },
+ "dispatch":{
+ "job.*": {"pipeline":"pipe","delay":10}
+ }
+ }`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ _, _, err := c.MatchPipeline(&Job{Job: "undefined", Options: &Options{}})
+ assert.Error(t, err)
+
+ p, _, _ := c.MatchPipeline(&Job{Job: "undefined", Options: &Options{Pipeline: "pipe"}})
+ assert.Equal(t, "pipe", p.Name())
+
+ p, opt, _ := c.MatchPipeline(&Job{Job: "job.abc", Options: &Options{}})
+ assert.Equal(t, "pipe", p.Name())
+ assert.Equal(t, 10, opt.Delay)
+}
+
+func Test_MatchPipeline_Error(t *testing.T) {
+ cfg := &mockCfg{cfg: `{
+ "workers":{
+ "pool":{"numWorkers": 1}
+ },
+ "pipelines":{
+ "pipe": {"broker":"default"}
+ },
+ "dispatch":{
+ "job.*": {"pipeline":"missing"}
+ }
+ }`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+
+ _, _, err := c.MatchPipeline(&Job{Job: "job.abc", Options: &Options{}})
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher.go
new file mode 100644
index 00000000..9fde8fac
--- /dev/null
+++ b/plugins/jobs/dispatcher.go
@@ -0,0 +1,47 @@
+package jobs
+
+import (
+ "strings"
+)
+
+var separators = []string{"/", "-", "\\"}
+
+// Dispatcher provides ability to automatically locate the pipeline for the specific job
+// and update job options (if none set).
+type Dispatcher map[string]*Options
+
+// pre-compile patterns
+func initDispatcher(routes map[string]*Options) Dispatcher {
+ dispatcher := make(Dispatcher)
+ for pattern, opts := range routes {
+ pattern = strings.ToLower(pattern)
+ pattern = strings.Trim(pattern, "-.*")
+
+ for _, s := range separators {
+ pattern = strings.Replace(pattern, s, ".", -1)
+ }
+
+ dispatcher[pattern] = opts
+ }
+
+ return dispatcher
+}
+
+// match clarifies target job pipeline and other job options. Can return nil.
+func (dispatcher Dispatcher) match(job *Job) (found *Options) {
+ var best = 0
+
+ jobName := strings.ToLower(job.Job)
+ for pattern, opts := range dispatcher {
+ if strings.HasPrefix(jobName, pattern) && len(pattern) > best {
+ found = opts
+ best = len(pattern)
+ }
+ }
+
+ if best == 0 {
+ return nil
+ }
+
+ return found
+}
diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go
new file mode 100644
index 00000000..59e3fd4e
--- /dev/null
+++ b/plugins/jobs/dispatcher_test.go
@@ -0,0 +1,53 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_Map_All(t *testing.T) {
+ m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}})
+ assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline)
+}
+
+func Test_Map_Miss(t *testing.T) {
+ m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}})
+
+ assert.Nil(t, m.match(&Job{Job: "miss"}))
+}
+
+func Test_Map_Best(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestUpper(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.Other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestReversed(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
new file mode 100644
index 00000000..a8e3778f
--- /dev/null
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-15T11:53:27.842Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="FDJoAwnkRul0Vyh6IqdT" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">ddHBEoIgEADQr+FOoM10NquLJw+dGdmEGXQdpNH6+nTAjKwTy9tdYBbCs2Y8W9GpAiUYwqgcCT8SxnZJsp+WWR5eDmnqobZahqIVSv2EgDToXUvoo0KHaJzuYqywbaFykQlrcYjLbmjiWztRwwbKSpitXrV0yiujlK6JC+haue9MI5bqAL0SEocP4jnhmUV0PmrGDMw8vWUwvu/0J/t+mYXW/WiYgvXsaRN9Ec9f</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/event.go b/plugins/jobs/event.go
new file mode 100644
index 00000000..68dd34e5
--- /dev/null
+++ b/plugins/jobs/event.go
@@ -0,0 +1,96 @@
+package jobs
+
+import "time"
+
+const (
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK = iota + 1500
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeConsume when pipeline pipelines has been requested.
+ EventPipeConsume
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStop when pipeline has begun stopping.
+ EventPipeStop
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventBrokerReady thrown when broken is ready to accept/serve tasks.
+ EventBrokerReady
+)
+
+// JobEvent represent job event.
+type JobEvent struct {
+ // String is job id.
+ ID string
+
+ // Job is failed job.
+ Job *Job
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *JobEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// JobError represents singular Job error event.
+type JobError struct {
+ // String is job id.
+ ID string
+
+ // Job is failed job.
+ Job *Job
+
+ // Caused contains job specific error.
+ Caused error
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *JobError) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// Caused returns error message.
+func (e *JobError) Error() string {
+ return e.Caused.Error()
+}
+
+// PipelineError defines pipeline specific errors.
+type PipelineError struct {
+ // Pipeline is associated pipeline.
+ Pipeline *Pipeline
+
+ // Caused send by broker.
+ Caused error
+}
+
+// Error returns error message.
+func (e *PipelineError) Error() string {
+ return e.Caused.Error()
+}
diff --git a/plugins/jobs/event_test.go b/plugins/jobs/event_test.go
new file mode 100644
index 00000000..94d53531
--- /dev/null
+++ b/plugins/jobs/event_test.go
@@ -0,0 +1,52 @@
+package jobs
+
+import (
+ "errors"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestJobEvent_Elapsed(t *testing.T) {
+ e := &JobEvent{
+ ID: "id",
+ Job: &Job{},
+ start: time.Now(),
+ elapsed: time.Millisecond,
+ }
+
+ assert.Equal(t, time.Millisecond, e.Elapsed())
+}
+
+func TestJobError_Elapsed(t *testing.T) {
+ e := &JobError{
+ ID: "id",
+ Job: &Job{},
+ start: time.Now(),
+ elapsed: time.Millisecond,
+ }
+
+ assert.Equal(t, time.Millisecond, e.Elapsed())
+}
+
+func TestJobError_Error(t *testing.T) {
+ e := &JobError{
+ ID: "id",
+ Job: &Job{},
+ start: time.Now(),
+ elapsed: time.Millisecond,
+ Caused: errors.New("error"),
+ }
+
+ assert.Equal(t, time.Millisecond, e.Elapsed())
+ assert.Equal(t, "error", e.Error())
+}
+
+func TestPipelineError_Error(t *testing.T) {
+ e := &PipelineError{
+ Pipeline: &Pipeline{},
+ Caused: errors.New("error"),
+ }
+
+ assert.Equal(t, "error", e.Error())
+}
diff --git a/plugins/jobs/job.go b/plugins/jobs/job.go
new file mode 100644
index 00000000..b747fcfd
--- /dev/null
+++ b/plugins/jobs/job.go
@@ -0,0 +1,38 @@
+package jobs
+
+import json "github.com/json-iterator/go"
+
+// Handler handles job execution.
+type Handler func(id string, j *Job) error
+
+// ErrorHandler handles job execution errors.
+type ErrorHandler func(id string, j *Job, err error)
+
+// Job carries information about single job.
+type Job struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Body packs job payload into binary payload.
+func (j *Job) Body() []byte {
+ return []byte(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (j *Job) Context(id string) []byte {
+ ctx, _ := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ }{ID: id, Job: j.Job},
+ )
+
+ return ctx
+}
diff --git a/plugins/jobs/job_options.go b/plugins/jobs/job_options.go
new file mode 100644
index 00000000..d4c6f0d2
--- /dev/null
+++ b/plugins/jobs/job_options.go
@@ -0,0 +1,70 @@
+package jobs
+
+import "time"
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int `json:"delay,omitempty"`
+
+ // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
+ // Minimum valuable value is 2.
+ Attempts int `json:"maxAttempts,omitempty"`
+
+ // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
+ RetryDelay int `json:"retryDelay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int `json:"timeout,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Attempts == 0 {
+ o.Attempts = from.Attempts
+ }
+
+ if o.Timeout == 0 {
+ o.Timeout = from.Timeout
+ }
+
+ if o.RetryDelay == 0 {
+ o.RetryDelay = from.RetryDelay
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.Attempts > (attempt + 1)
+}
+
+// RetryDuration returns retry delay duration in a form of time.Duration.
+func (o *Options) RetryDuration() time.Duration {
+ return time.Second * time.Duration(o.RetryDelay)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/job_options_test.go
new file mode 100644
index 00000000..8caaa935
--- /dev/null
+++ b/plugins/jobs/job_options_test.go
@@ -0,0 +1,109 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestOptions_CanRetry(t *testing.T) {
+ opts := &Options{Attempts: 0}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_SameValue(t *testing.T) {
+ opts := &Options{Attempts: 1}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_Value(t *testing.T) {
+ opts := &Options{Attempts: 2}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_CanRetry_Value3(t *testing.T) {
+ opts := &Options{Attempts: 3}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.True(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_RetryDuration(t *testing.T) {
+ opts := &Options{RetryDelay: 0}
+ assert.Equal(t, time.Duration(0), opts.RetryDuration())
+}
+
+func TestOptions_RetryDuration2(t *testing.T) {
+ opts := &Options{RetryDelay: 1}
+ assert.Equal(t, time.Second, opts.RetryDuration())
+}
+
+func TestOptions_DelayDuration(t *testing.T) {
+ opts := &Options{Delay: 0}
+ assert.Equal(t, time.Duration(0), opts.DelayDuration())
+}
+
+func TestOptions_DelayDuration2(t *testing.T) {
+ opts := &Options{Delay: 1}
+ assert.Equal(t, time.Second, opts.DelayDuration())
+}
+
+func TestOptions_TimeoutDuration(t *testing.T) {
+ opts := &Options{Timeout: 0}
+ assert.Equal(t, time.Minute*30, opts.TimeoutDuration())
+}
+
+func TestOptions_TimeoutDuration2(t *testing.T) {
+ opts := &Options{Timeout: 1}
+ assert.Equal(t, time.Second, opts.TimeoutDuration())
+}
+
+func TestOptions_Merge(t *testing.T) {
+ opts := &Options{}
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "pipeline", opts.Pipeline)
+ assert.Equal(t, 1, opts.Attempts)
+ assert.Equal(t, 2, opts.Delay)
+ assert.Equal(t, 1, opts.Timeout)
+ assert.Equal(t, 1, opts.RetryDelay)
+}
+
+func TestOptions_MergeKeepOriginal(t *testing.T) {
+ opts := &Options{
+ Pipeline: "default",
+ Delay: 10,
+ Timeout: 10,
+ Attempts: 10,
+ RetryDelay: 10,
+ }
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "default", opts.Pipeline)
+ assert.Equal(t, 10, opts.Attempts)
+ assert.Equal(t, 10, opts.Delay)
+ assert.Equal(t, 10, opts.Timeout)
+ assert.Equal(t, 10, opts.RetryDelay)
+}
diff --git a/plugins/jobs/job_test.go b/plugins/jobs/job_test.go
new file mode 100644
index 00000000..e1938eca
--- /dev/null
+++ b/plugins/jobs/job_test.go
@@ -0,0 +1,18 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestJob_Body(t *testing.T) {
+ j := &Job{Payload: "hello"}
+
+ assert.Equal(t, []byte("hello"), j.Body())
+}
+
+func TestJob_Context(t *testing.T) {
+ j := &Job{Job: "job"}
+
+ assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
+}
diff --git a/plugins/jobs/pipeline.go b/plugins/jobs/pipeline.go
new file mode 100644
index 00000000..58b76e33
--- /dev/null
+++ b/plugins/jobs/pipeline.go
@@ -0,0 +1,169 @@
+package jobs
+
+import (
+ "fmt"
+ "time"
+)
+
+// Pipelines is list of Pipeline.
+type Pipelines []*Pipeline
+
+func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
+ out := make(Pipelines, 0)
+
+ for name, pipe := range pipes {
+ if pipe.Broker() == "" {
+ return nil, fmt.Errorf("found the pipeline without defined broker")
+ }
+
+ p := pipe.With("name", name)
+ out = append(out, &p)
+ }
+
+ return out, nil
+}
+
+// Reverse returns pipelines in reversed order.
+func (ps Pipelines) Reverse() Pipelines {
+ out := make(Pipelines, len(ps))
+
+ for i, p := range ps {
+ out[len(ps)-i-1] = p
+ }
+
+ return out
+}
+
+// Broker return pipelines associated with specific broker.
+func (ps Pipelines) Broker(broker string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, p := range ps {
+ if p.Broker() != broker {
+ continue
+ }
+
+ out = append(out, p)
+ }
+
+ return out
+}
+
+// Names returns only pipelines with specified names.
+func (ps Pipelines) Names(only ...string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, name := range only {
+ for _, p := range ps {
+ if p.Name() == name {
+ out = append(out, p)
+ }
+ }
+ }
+
+ return out
+}
+
+// Get returns pipeline by it'svc name.
+func (ps Pipelines) Get(name string) *Pipeline {
+ // possibly optimize
+ for _, p := range ps {
+ if p.Name() == name {
+ return p
+ }
+ }
+
+ return nil
+}
+
+// Pipeline defines pipeline options.
+type Pipeline map[string]interface{}
+
+// With pipeline value. Immutable.
+func (p Pipeline) With(name string, value interface{}) Pipeline {
+ out := make(map[string]interface{})
+ for k, v := range p {
+ out[k] = v
+ }
+ out[name] = value
+
+ return Pipeline(out)
+}
+
+// Name returns pipeline name.
+func (p Pipeline) Name() string {
+ return p.String("name", "")
+}
+
+// Broker associated with the pipeline.
+func (p Pipeline) Broker() string {
+ return p.String("broker", "")
+}
+
+// Has checks if value presented in pipeline.
+func (p Pipeline) Has(name string) bool {
+ if _, ok := p[name]; ok {
+ return true
+ }
+
+ return false
+}
+
+// Map must return nested map value or empty config.
+func (p Pipeline) Map(name string) Pipeline {
+ out := make(map[string]interface{})
+
+ if value, ok := p[name]; ok {
+ if m, ok := value.(map[string]interface{}); ok {
+ for k, v := range m {
+ out[k] = v
+ }
+ }
+ }
+
+ return Pipeline(out)
+}
+
+// Bool must return option value as string or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if b, ok := value.(bool); ok {
+ return b
+ }
+ }
+
+ return d
+}
+
+// String must return option value as string or return default value.
+func (p Pipeline) String(name string, d string) string {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Integer must return option value as string or return default value.
+func (p Pipeline) Integer(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Duration must return option value as time.Duration (seconds) or return default value.
+func (p Pipeline) Duration(name string, d time.Duration) time.Duration {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return time.Second * time.Duration(str)
+ }
+ }
+
+ return d
+}
diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline_test.go
new file mode 100644
index 00000000..b80e75d0
--- /dev/null
+++ b/plugins/jobs/pipeline_test.go
@@ -0,0 +1,89 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestPipeline_Map(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0))
+ assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0))
+}
+
+func TestPipeline_MapString(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}}
+
+ assert.Equal(t, "default", pipe.Map("options").String("alias", ""))
+ assert.Equal(t, "", pipe.Map("other").String("alias", ""))
+}
+
+func TestPipeline_Bool(t *testing.T) {
+ pipe := Pipeline{"value": true}
+
+ assert.Equal(t, true, pipe.Bool("value", false))
+ assert.Equal(t, true, pipe.Bool("other", true))
+}
+
+func TestPipeline_String(t *testing.T) {
+ pipe := Pipeline{"value": "value"}
+
+ assert.Equal(t, "value", pipe.String("value", ""))
+ assert.Equal(t, "value", pipe.String("other", "value"))
+}
+
+func TestPipeline_Integer(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, 1, pipe.Integer("value", 0))
+ assert.Equal(t, 1, pipe.Integer("other", 1))
+}
+
+func TestPipeline_Duration(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, time.Second, pipe.Duration("value", 0))
+ assert.Equal(t, time.Second, pipe.Duration("other", time.Second))
+}
+
+func TestPipeline_Has(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, true, pipe.Has("options"))
+ assert.Equal(t, false, pipe.Has("other"))
+}
+
+func TestPipeline_FilterBroker(t *testing.T) {
+ pipes := Pipelines{
+ &Pipeline{"name": "first", "broker": "a"},
+ &Pipeline{"name": "second", "broker": "a"},
+ &Pipeline{"name": "third", "broker": "b"},
+ &Pipeline{"name": "forth", "broker": "b"},
+ }
+
+ filtered := pipes.Names("first", "third")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[0].Broker())
+ assert.Equal(t, "b", filtered[1].Broker())
+
+ filtered = pipes.Names("first", "third").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[1].Broker())
+ assert.Equal(t, "b", filtered[0].Broker())
+
+ filtered = pipes.Broker("a")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[0].Name())
+ assert.Equal(t, "second", filtered[1].Name())
+
+ filtered = pipes.Broker("a").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[1].Name())
+ assert.Equal(t, "second", filtered[0].Name())
+}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
new file mode 100644
index 00000000..cc1ecd99
--- /dev/null
+++ b/plugins/jobs/rpc.go
@@ -0,0 +1,151 @@
+package jobs
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner/util"
+)
+
+type rpcServer struct{ svc *Service }
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []*util.State `json:"workers"`
+}
+
+// PipelineList contains list of pipeline stats.
+type PipelineList struct {
+ // Pipelines is list of pipeline stats.
+ Pipelines []*Stat `json:"pipelines"`
+}
+
+// Push job to the testQueue.
+func (rpc *rpcServer) Push(j *Job, id *string) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ *id, err = rpc.svc.Push(j)
+ return
+}
+
+// Push job to the testQueue.
+func (rpc *rpcServer) PushAsync(j *Job, ok *bool) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ *ok = true
+ go rpc.svc.Push(j)
+
+ return
+}
+
+// Reset resets underlying RR worker pool and restarts all of it's workers.
+func (rpc *rpcServer) Reset(reset bool, w *string) error {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ *w = "OK"
+ return rpc.svc.rr.Reset()
+}
+
+// Destroy job pipelines for a given pipeline.
+func (rpc *rpcServer) Stop(pipeline string, w *string) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ pipe := rpc.svc.cfg.pipelines.Get(pipeline)
+ if pipe == nil {
+ return fmt.Errorf("undefined pipeline `%s`", pipeline)
+ }
+
+ if err := rpc.svc.Consume(pipe, nil, nil); err != nil {
+ return err
+ }
+
+ *w = "OK"
+ return nil
+}
+
+// Resume job pipelines for a given pipeline.
+func (rpc *rpcServer) Resume(pipeline string, w *string) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ pipe := rpc.svc.cfg.pipelines.Get(pipeline)
+ if pipe == nil {
+ return fmt.Errorf("undefined pipeline `%s`", pipeline)
+ }
+
+ if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil {
+ return err
+ }
+
+ *w = "OK"
+ return nil
+}
+
+// Destroy job pipelines for a given pipeline.
+func (rpc *rpcServer) StopAll(stop bool, w *string) (err error) {
+ if rpc.svc == nil || rpc.svc.rr == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ for _, pipe := range rpc.svc.cfg.pipelines {
+ if err := rpc.svc.Consume(pipe, nil, nil); err != nil {
+ return err
+ }
+ }
+
+ *w = "OK"
+ return nil
+}
+
+// Resume job pipelines for a given pipeline.
+func (rpc *rpcServer) ResumeAll(resume bool, w *string) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ for _, pipe := range rpc.svc.cfg.pipelines {
+ if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil {
+ return err
+ }
+ }
+
+ *w = "OK"
+ return nil
+}
+
+// Workers returns list of pipelines workers and their stats.
+func (rpc *rpcServer) Workers(list bool, w *WorkerList) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ w.Workers, err = util.ServerState(rpc.svc.rr)
+ return err
+}
+
+// Stat returns list of pipelines workers and their stats.
+func (rpc *rpcServer) Stat(list bool, l *PipelineList) (err error) {
+ if rpc.svc == nil {
+ return fmt.Errorf("jobs server is not running")
+ }
+
+ *l = PipelineList{}
+ for _, p := range rpc.svc.cfg.pipelines {
+ stat, err := rpc.svc.Stat(p)
+ if err != nil {
+ return err
+ }
+
+ l.Pipelines = append(l.Pipelines, stat)
+ }
+
+ return err
+}
diff --git a/plugins/jobs/rpc_test.go b/plugins/jobs/rpc_test.go
new file mode 100644
index 00000000..c70ef86f
--- /dev/null
+++ b/plugins/jobs/rpc_test.go
@@ -0,0 +1,657 @@
+package jobs
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/rpc"
+ "github.com/stretchr/testify/assert"
+ "io/ioutil"
+ "syscall"
+ "testing"
+)
+
+func TestRPC_StatPipeline(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ list := &PipelineList{}
+ assert.NoError(t, cl.Call("jobs.Stat", true, &list))
+
+ assert.Len(t, list.Pipelines, 1)
+
+ assert.Equal(t, int64(0), list.Pipelines[0].Queue)
+ assert.Equal(t, true, list.Pipelines[0].Consuming)
+}
+
+func TestRPC_StatNonActivePipeline(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ list := &PipelineList{}
+ assert.NoError(t, cl.Call("jobs.Stat", true, &list))
+
+ assert.Len(t, list.Pipelines, 1)
+
+ assert.Equal(t, int64(0), list.Pipelines[0].Queue)
+ assert.Equal(t, false, list.Pipelines[0].Consuming)
+}
+
+func TestRPC_StatPipelineWithUndefinedBroker(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"undefined"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ list := &PipelineList{}
+ assert.Error(t, cl.Call("jobs.Stat", true, &list))
+}
+
+func TestRPC_EnableConsuming(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ pipelineReady := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ if event == EventPipeActive {
+ close(pipelineReady)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ assert.NoError(t, cl.Call("jobs.Resume", "default", nil))
+
+ <-pipelineReady
+
+ list := &PipelineList{}
+ assert.NoError(t, cl.Call("jobs.Stat", true, &list))
+
+ assert.Len(t, list.Pipelines, 1)
+
+ assert.Equal(t, int64(0), list.Pipelines[0].Queue)
+ assert.Equal(t, true, list.Pipelines[0].Consuming)
+}
+
+func TestRPC_EnableConsumingUndefined(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5005"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+ ok := ""
+ assert.Error(t, cl.Call("jobs.Resume", "undefined", &ok))
+}
+
+func TestRPC_EnableConsumingUndefinedBroker(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5005"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"undefined"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+ ok := ""
+ assert.Error(t, cl.Call("jobs.Resume", "default", &ok))
+}
+
+func TestRPC_EnableConsumingAllUndefinedBroker(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5005"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"undefined"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+ ok := ""
+ assert.Error(t, cl.Call("jobs.ResumeAll", true, &ok))
+}
+
+func TestRPC_DisableConsuming(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ pipelineReady := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ if event == EventPipeStopped {
+ close(pipelineReady)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ assert.NoError(t, cl.Call("jobs.Stop", "default", nil))
+
+ <-pipelineReady
+
+ list := &PipelineList{}
+ assert.NoError(t, cl.Call("jobs.Stat", true, &list))
+
+ assert.Len(t, list.Pipelines, 1)
+
+ assert.Equal(t, int64(0), list.Pipelines[0].Queue)
+ assert.Equal(t, false, list.Pipelines[0].Consuming)
+}
+
+func TestRPC_DisableConsumingUndefined(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ ok := ""
+ assert.Error(t, cl.Call("jobs.Stop", "undefined", &ok))
+}
+
+func TestRPC_EnableAllConsuming(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ pipelineReady := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ if event == EventPipeActive {
+ close(pipelineReady)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ assert.NoError(t, cl.Call("jobs.ResumeAll", true, nil))
+
+ <-pipelineReady
+
+ list := &PipelineList{}
+ assert.NoError(t, cl.Call("jobs.Stat", true, &list))
+
+ assert.Len(t, list.Pipelines, 1)
+
+ assert.Equal(t, int64(0), list.Pipelines[0].Queue)
+ assert.Equal(t, true, list.Pipelines[0].Consuming)
+}
+
+func TestRPC_DisableAllConsuming(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ pipelineReady := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ if event == EventPipeStopped {
+ close(pipelineReady)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ assert.NoError(t, cl.Call("jobs.StopAll", true, nil))
+
+ <-pipelineReady
+
+ list := &PipelineList{}
+ assert.NoError(t, cl.Call("jobs.Stat", true, &list))
+
+ assert.Len(t, list.Pipelines, 1)
+
+ assert.Equal(t, int64(0), list.Pipelines[0].Queue)
+ assert.Equal(t, false, list.Pipelines[0].Consuming)
+}
+
+func TestRPC_DoJob(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobReady := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ if event == EventJobOK {
+ close(jobReady)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ id := ""
+ assert.NoError(t, cl.Call("jobs.Push", &Job{
+ Job: "spiral.jobs.tests.local.job",
+ Payload: `{"data":100}`,
+ Options: &Options{},
+ }, &id))
+ assert.NoError(t, err)
+
+ <-jobReady
+
+ data, err := ioutil.ReadFile("tests/local.job")
+ assert.NoError(t, err)
+ defer syscall.Unlink("tests/local.job")
+
+ assert.Contains(t, string(data), id)
+}
+
+func TestRPC_NoOperationOnDeadServer(t *testing.T) {
+ rc := &rpcServer{nil}
+
+ assert.Error(t, rc.Push(&Job{}, nil))
+ assert.Error(t, rc.Reset(true, nil))
+
+ assert.Error(t, rc.Stop("default", nil))
+ assert.Error(t, rc.StopAll(true, nil))
+
+ assert.Error(t, rc.Resume("default", nil))
+ assert.Error(t, rc.ResumeAll(true, nil))
+
+ assert.Error(t, rc.Workers(true, nil))
+ assert.Error(t, rc.Stat(true, nil))
+}
+
+func TestRPC_Workers(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("rpc", &rpc.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "rpc":{"listen":"tcp://:5004"},
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ list := &WorkerList{}
+ assert.NoError(t, cl.Call("jobs.Workers", true, &list))
+
+ assert.Len(t, list.Workers, 1)
+
+ pid := list.Workers[0].Pid
+ assert.NotEqual(t, 0, pid)
+
+ // reset
+ ok := ""
+ assert.NoError(t, cl.Call("jobs.Reset", true, &ok))
+
+ list = &WorkerList{}
+ assert.NoError(t, cl.Call("jobs.Workers", true, &list))
+
+ assert.Len(t, list.Workers, 1)
+
+ assert.NotEqual(t, list.Workers[0].Pid, pid)
+}
diff --git a/plugins/jobs/service.go b/plugins/jobs/service.go
new file mode 100644
index 00000000..bb7ce09c
--- /dev/null
+++ b/plugins/jobs/service.go
@@ -0,0 +1,327 @@
+package jobs
+
+import (
+ "fmt"
+ //"github.com/sirupsen/logrus"
+ //"github.com/spiral/roadrunner"
+ //"github.com/spiral/roadrunner/service"
+ //"github.com/spiral/roadrunner/service/env"
+ //"github.com/spiral/roadrunner/service/rpc"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// ID defines public service name.
+const ID = "jobs"
+
+// Service wraps roadrunner container and manage set of parent within it.
+type Service struct {
+ // Associated parent
+ Brokers map[string]Broker
+
+ // brokers and routing config
+ cfg *Config
+
+ // environment, logger and listeners
+ //env env.Environment
+ //log *logrus.Logger
+ lsn []func(event int, ctx interface{})
+
+ // server and server controller
+ //rr *roadrunner.Server
+ //cr roadrunner.Controller
+
+ // task balancer
+ execPool chan Handler
+
+ // registered brokers
+ serving int32
+ //brokers service.Container
+
+ // pipelines pipelines
+ mup sync.Mutex
+ pipelines map[*Pipeline]bool
+}
+
+// Attach attaches cr. Currently only one cr is supported.
+func (svc *Service) Attach(ctr roadrunner.Controller) {
+ svc.cr = ctr
+}
+
+// AddListener attaches event listeners to the service and all underlying brokers.
+func (svc *Service) AddListener(l func(event int, ctx interface{})) {
+ svc.lsn = append(svc.lsn, l)
+}
+
+// Init configures job service.
+func (svc *Service) Init(
+ cfg service.Config,
+ log *logrus.Logger,
+ env env.Environment,
+ rpc *rpc.Service,
+) (ok bool, err error) {
+ svc.cfg = &Config{}
+ if err := svc.cfg.Hydrate(cfg); err != nil {
+ return false, err
+ }
+
+ svc.env = env
+ svc.log = log
+
+ if rpc != nil {
+ if err := rpc.Register(ID, &rpcServer{svc}); err != nil {
+ return false, err
+ }
+ }
+
+ // limit the number of parallel threads
+ if svc.cfg.Workers.Command != "" {
+ svc.execPool = make(chan Handler, svc.cfg.Workers.Pool.NumWorkers)
+ for i := int64(0); i < svc.cfg.Workers.Pool.NumWorkers; i++ {
+ svc.execPool <- svc.exec
+ }
+
+ svc.rr = roadrunner.NewServer(svc.cfg.Workers)
+ }
+
+ svc.pipelines = make(map[*Pipeline]bool)
+ for _, p := range svc.cfg.pipelines {
+ svc.pipelines[p] = false
+ }
+
+ // run all brokers in nested container
+ svc.brokers = service.NewContainer(log)
+ for name, b := range svc.Brokers {
+ svc.brokers.Register(name, b)
+ if ep, ok := b.(EventProvider); ok {
+ ep.Listen(svc.throw)
+ }
+ }
+
+ // init all broker configs
+ if err := svc.brokers.Init(svc.cfg); err != nil {
+ return false, err
+ }
+
+ // register all pipelines (per broker)
+ for name, b := range svc.Brokers {
+ for _, pipe := range svc.cfg.pipelines.Broker(name) {
+ if err := b.Register(pipe); err != nil {
+ return false, err
+ }
+ }
+ }
+
+ return true, nil
+}
+
+// Serve serves local rr server and creates broker association.
+func (svc *Service) Serve() error {
+ if svc.rr != nil {
+ if svc.env != nil {
+ if err := svc.env.Copy(svc.cfg.Workers); err != nil {
+ return err
+ }
+ }
+
+ // ensure that workers aware of running within jobs
+ svc.cfg.Workers.SetEnv("rr_jobs", "true")
+ svc.rr.Listen(svc.throw)
+
+ if svc.cr != nil {
+ svc.rr.Attach(svc.cr)
+ }
+
+ if err := svc.rr.Start(); err != nil {
+ return err
+ }
+ defer svc.rr.Stop()
+
+ // start pipelines of all the pipelines
+ for _, p := range svc.cfg.pipelines.Names(svc.cfg.Consume...) {
+ // start pipeline consuming
+ if err := svc.Consume(p, svc.execPool, svc.error); err != nil {
+ svc.Stop()
+
+ return err
+ }
+ }
+ }
+
+ atomic.StoreInt32(&svc.serving, 1)
+ defer atomic.StoreInt32(&svc.serving, 0)
+
+ return svc.brokers.Serve()
+}
+
+// Stop all pipelines and rr server.
+func (svc *Service) Stop() {
+ if atomic.LoadInt32(&svc.serving) == 0 {
+ return
+ }
+
+ wg := sync.WaitGroup{}
+ for _, p := range svc.cfg.pipelines.Names(svc.cfg.Consume...).Reverse() {
+ wg.Add(1)
+
+ go func(p *Pipeline) {
+ defer wg.Done()
+ if err := svc.Consume(p, nil, nil); err != nil {
+ svc.throw(EventPipeError, &PipelineError{Pipeline: p, Caused: err})
+ }
+ }(p)
+ }
+
+ wg.Wait()
+ svc.brokers.Stop()
+}
+
+// Server returns associated rr server (if any).
+func (svc *Service) Server() *roadrunner.Server {
+ return svc.rr
+}
+
+// Stat returns list of pipelines workers and their stats.
+func (svc *Service) Stat(pipe *Pipeline) (stat *Stat, err error) {
+ b, ok := svc.Brokers[pipe.Broker()]
+ if !ok {
+ return nil, fmt.Errorf("undefined broker `%s`", pipe.Broker())
+ }
+
+ stat, err = b.Stat(pipe)
+ if err != nil {
+ return nil, err
+ }
+
+ stat.Pipeline = pipe.Name()
+ stat.Broker = pipe.Broker()
+
+ svc.mup.Lock()
+ stat.Consuming = svc.pipelines[pipe]
+ svc.mup.Unlock()
+
+ return stat, err
+}
+
+// Consume enables or disables pipeline pipelines using given handlers.
+func (svc *Service) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error {
+ svc.mup.Lock()
+
+ if execPool != nil {
+ if svc.pipelines[pipe] {
+ svc.mup.Unlock()
+ return nil
+ }
+
+ svc.throw(EventPipeConsume, pipe)
+ svc.pipelines[pipe] = true
+ } else {
+ if !svc.pipelines[pipe] {
+ svc.mup.Unlock()
+ return nil
+ }
+
+ svc.throw(EventPipeStop, pipe)
+ svc.pipelines[pipe] = false
+ }
+
+ broker, ok := svc.Brokers[pipe.Broker()]
+ if !ok {
+ svc.mup.Unlock()
+ return fmt.Errorf("undefined broker `%s`", pipe.Broker())
+ }
+ svc.mup.Unlock()
+
+ if err := broker.Consume(pipe, execPool, errHandler); err != nil {
+ svc.mup.Lock()
+ svc.pipelines[pipe] = false
+ svc.mup.Unlock()
+
+ svc.throw(EventPipeError, &PipelineError{Pipeline: pipe, Caused: err})
+
+ return err
+ }
+
+ if execPool != nil {
+ svc.throw(EventPipeActive, pipe)
+ } else {
+ svc.throw(EventPipeStopped, pipe)
+ }
+
+ return nil
+}
+
+// Push job to associated broker and return job id.
+func (svc *Service) Push(job *Job) (string, error) {
+ pipe, pOpts, err := svc.cfg.MatchPipeline(job)
+ if err != nil {
+ return "", err
+ }
+
+ if pOpts != nil {
+ job.Options.Merge(pOpts)
+ }
+
+ broker, ok := svc.Brokers[pipe.Broker()]
+ if !ok {
+ return "", fmt.Errorf("undefined broker `%s`", pipe.Broker())
+ }
+
+ id, err := broker.Push(pipe, job)
+
+ if err != nil {
+ svc.throw(EventPushError, &JobError{Job: job, Caused: err})
+ } else {
+ svc.throw(EventPushOK, &JobEvent{ID: id, Job: job})
+ }
+
+ return id, err
+}
+
+// exec executed job using local RR server. Make sure that service is started.
+func (svc *Service) exec(id string, j *Job) error {
+ start := time.Now()
+ svc.throw(EventJobStart, &JobEvent{ID: id, Job: j, start: start})
+
+ // ignore response for now, possibly add more routing options
+ _, err := svc.rr.Exec(&roadrunner.Payload{
+ Body: j.Body(),
+ Context: j.Context(id),
+ })
+
+ if err == nil {
+ svc.throw(EventJobOK, &JobEvent{
+ ID: id,
+ Job: j,
+ start: start,
+ elapsed: time.Since(start),
+ })
+ } else {
+ svc.throw(EventJobError, &JobError{
+ ID: id,
+ Job: j,
+ Caused: err, start: start,
+ elapsed: time.Since(start),
+ })
+ }
+
+ return err
+}
+
+// register died job, can be used to move to fallback testQueue or log
+func (svc *Service) error(id string, j *Job, err error) {
+ // nothing for now, possibly route to another pipeline
+}
+
+// throw handles service, server and pool events.
+func (svc *Service) throw(event int, ctx interface{}) {
+ for _, l := range svc.lsn {
+ l(event, ctx)
+ }
+
+ if event == roadrunner.EventServerFailure {
+ // underlying rr server is dead, stop everything
+ svc.Stop()
+ }
+}
diff --git a/plugins/jobs/service_test.go b/plugins/jobs/service_test.go
new file mode 100644
index 00000000..74781525
--- /dev/null
+++ b/plugins/jobs/service_test.go
@@ -0,0 +1,458 @@
+package jobs
+
+import (
+ "bytes"
+ "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/env"
+ "github.com/stretchr/testify/assert"
+ "io/ioutil"
+ "syscall"
+ "testing"
+)
+
+func viperConfig(cfg string) service.Config {
+ v := viper.New()
+ v.SetConfigType("json")
+
+ err := v.ReadConfig(bytes.NewBuffer([]byte(cfg)))
+ if err != nil {
+ panic(err)
+ }
+
+ return &configWrapper{v}
+}
+
+// configWrapper provides interface bridge between v configs and service.Config.
+type configWrapper struct {
+ v *viper.Viper
+}
+
+// Get nested config section (sub-map), returns nil if section not found.
+func (w *configWrapper) Get(key string) service.Config {
+ sub := w.v.Sub(key)
+ if sub == nil {
+ return nil
+ }
+
+ return &configWrapper{sub}
+}
+
+// Unmarshal unmarshal config data into given struct.
+func (w *configWrapper) Unmarshal(out interface{}) error {
+ return w.v.Unmarshal(out)
+}
+
+func jobs(container service.Container) *Service {
+ svc, _ := container.Get("jobs")
+ return svc.(*Service)
+}
+
+func TestService_Init(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+}
+
+func TestService_ServeStop(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("env", &env.Service{})
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ <-ready
+ c.Stop()
+}
+
+func TestService_ServeError(t *testing.T) {
+ l := logrus.New()
+ l.Level = logrus.FatalLevel
+
+ c := service.NewContainer(l)
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/bad-consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ assert.Error(t, c.Serve())
+}
+
+func TestService_GetPipeline(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ assert.Equal(t, "ephemeral", jobs(c).cfg.pipelines.Get("default").Broker())
+}
+
+func TestService_StatPipeline(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ svc := jobs(c)
+ pipe := svc.cfg.pipelines.Get("default")
+
+ stat, err := svc.Stat(pipe)
+ assert.NoError(t, err)
+
+ assert.Equal(t, int64(0), stat.Queue)
+ assert.Equal(t, true, stat.Consuming)
+}
+
+func TestService_StatNonConsumingPipeline(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ svc := jobs(c)
+ pipe := svc.cfg.pipelines.Get("default")
+
+ stat, err := svc.Stat(pipe)
+ assert.NoError(t, err)
+
+ assert.Equal(t, int64(0), stat.Queue)
+ assert.Equal(t, false, stat.Consuming)
+}
+
+func TestService_DoJob(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobReady := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ if event == EventJobOK {
+ close(jobReady)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ svc := jobs(c)
+
+ id, err := svc.Push(&Job{
+ Job: "spiral.jobs.tests.local.job",
+ Payload: `{"data":100}`,
+ Options: &Options{},
+ })
+ assert.NoError(t, err)
+
+ <-jobReady
+
+ data, err := ioutil.ReadFile("tests/local.job")
+ assert.NoError(t, err)
+ defer syscall.Unlink("tests/local.job")
+
+ assert.Contains(t, string(data), id)
+}
+
+func TestService_DoUndefinedJob(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ svc := jobs(c)
+
+ _, err := svc.Push(&Job{
+ Job: "spiral.jobs.tests.undefined",
+ Payload: `{"data":100}`,
+ Options: &Options{},
+ })
+ assert.Error(t, err)
+}
+
+func TestService_DoJobIntoInvalidBroker(t *testing.T) {
+ l := logrus.New()
+ l.Level = logrus.FatalLevel
+
+ c := service.NewContainer(l)
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"undefined"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ svc := jobs(c)
+
+ _, err := svc.Push(&Job{
+ Job: "spiral.jobs.tests.local.job",
+ Payload: `{"data":100}`,
+ Options: &Options{},
+ })
+ assert.Error(t, err)
+}
+
+func TestService_DoStatInvalidBroker(t *testing.T) {
+ l := logrus.New()
+ l.Level = logrus.FatalLevel
+
+ c := service.NewContainer(l)
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"undefined"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": []
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ svc := jobs(c)
+
+ _, err := svc.Stat(svc.cfg.pipelines.Get("default"))
+ assert.Error(t, err)
+}
+
+func TestService_DoErrorJob(t *testing.T) {
+ c := service.NewContainer(logrus.New())
+ c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
+
+ assert.NoError(t, c.Init(viperConfig(`{
+ "jobs":{
+ "workers":{
+ "command": "php tests/consumer.php",
+ "pool.numWorkers": 1
+ },
+ "pipelines":{"default":{"broker":"ephemeral"}},
+ "dispatch": {
+ "spiral-jobs-tests-local-*.pipeline": "default"
+ },
+ "consume": ["default"]
+ }
+}`)))
+
+ ready := make(chan interface{})
+ jobReady := make(chan interface{})
+
+ var jobErr error
+ jobs(c).AddListener(func(event int, ctx interface{}) {
+ if event == EventBrokerReady {
+ close(ready)
+ }
+
+ if event == EventJobError {
+ jobErr = ctx.(error)
+ close(jobReady)
+ }
+ })
+
+ go func() { c.Serve() }()
+ defer c.Stop()
+ <-ready
+
+ svc := jobs(c)
+
+ _, err := svc.Push(&Job{
+ Job: "spiral.jobs.tests.local.errorJob",
+ Payload: `{"data":100}`,
+ Options: &Options{},
+ })
+ assert.NoError(t, err)
+
+ <-jobReady
+ assert.Error(t, jobErr)
+ assert.Contains(t, jobErr.Error(), "something is wrong")
+}
diff --git a/plugins/jobs/tests/.rr.yaml b/plugins/jobs/tests/.rr.yaml
new file mode 100644
index 00000000..2fd323db
--- /dev/null
+++ b/plugins/jobs/tests/.rr.yaml
@@ -0,0 +1,63 @@
+jobs:
+ # worker pool configuration
+ workers:
+ command: "php consumer.php"
+ pool:
+ numWorkers: 4
+
+ # rabbitmq and similar servers
+ amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+ beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+ sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+
+ # job destinations and options
+ dispatch:
+ spiral-jobs-tests-amqp-*.pipeline: amqp
+ spiral-jobs-tests-local-*.pipeline: local
+ spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
+ spiral-jobs-tests-sqs-*.pipeline: sqs
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ local:
+ broker: ephemeral
+
+ amqp:
+ broker: amqp
+ queue: default
+
+ beanstalk:
+ broker: beanstalk
+ tube: default
+
+ sqs:
+ broker: sqs
+ queue: default
+ declare:
+ MessageRetentionPeriod: 86400
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: ["local", "amqp", "beanstalk", "sqs"]
+
+metrics:
+ address: localhost:2112
+
+# monitors rr server(s)
+limit:
+ interval: 1
+ services:
+ jobs:
+ maxMemory: 100
+ TTL: 0
+ idleTTL: 0
+ execTTL: 60 \ No newline at end of file
diff --git a/plugins/jobs/tests/Jobs/Amqp/BrokerTest.php b/plugins/jobs/tests/Jobs/Amqp/BrokerTest.php
new file mode 100644
index 00000000..637c14d6
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Amqp/BrokerTest.php
@@ -0,0 +1,20 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Amqp;
+
+use Spiral\Jobs\Tests\BaseTest;
+
+class BrokerTest extends BaseTest
+{
+ public const JOB = Job::class;
+ public const ERROR_JOB = ErrorJob::class;
+}
diff --git a/plugins/jobs/tests/Jobs/Amqp/ErrorJob.php b/plugins/jobs/tests/Jobs/Amqp/ErrorJob.php
new file mode 100644
index 00000000..82b6e7e0
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Amqp/ErrorJob.php
@@ -0,0 +1,22 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Amqp;
+
+use Spiral\Jobs\JobHandler;
+
+class ErrorJob extends JobHandler
+{
+ public function invoke(string $id): void
+ {
+ throw new \Error('something is wrong');
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/Amqp/Job.php b/plugins/jobs/tests/Jobs/Amqp/Job.php
new file mode 100644
index 00000000..2c6ad819
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Amqp/Job.php
@@ -0,0 +1,26 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Amqp;
+
+use Spiral\Jobs\JobHandler;
+
+class Job extends JobHandler
+{
+ public const JOB_FILE = __DIR__ . '/../../local.job';
+
+ public function invoke(string $id, array $payload): void
+ {
+ file_put_contents(self::JOB_FILE, json_encode(
+ $payload + compact('id')
+ ));
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/BaseTest.php b/plugins/jobs/tests/Jobs/BaseTest.php
new file mode 100644
index 00000000..67f280b5
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/BaseTest.php
@@ -0,0 +1,115 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests;
+
+use PHPUnit\Framework\TestCase;
+use Spiral\Core\Container;
+use Spiral\Goridge\RPC;
+use Spiral\Goridge\SocketRelay;
+use Spiral\Jobs\Options;
+use Spiral\Jobs\Queue;
+use Spiral\Jobs\Registry\ContainerRegistry;
+
+abstract class BaseTest extends TestCase
+{
+ public const JOB = null;
+ public const ERROR_JOB = null;
+
+ private $job;
+ private $errorJob;
+
+ public function setUp(): void
+ {
+ $this->job = static::JOB;
+ $this->errorJob = static::ERROR_JOB;
+ }
+
+ protected function tearDown(): void
+ {
+ if (file_exists((static::JOB)::JOB_FILE)) {
+ unlink((static::JOB)::JOB_FILE);
+ }
+ }
+
+ public function testJob(): void
+ {
+ $jobs = $this->makeJobs();
+
+ $id = $jobs->push($this->job, ['data' => 100]);
+
+ $this->assertNotEmpty($id);
+
+ $this->waitForJob();
+ $this->assertFileExists($this->job::JOB_FILE);
+
+ $data = json_decode(file_get_contents($this->job::JOB_FILE), true);
+ $this->assertSame($id, $data['id']);
+ $this->assertSame(100, $data['data']);
+ }
+
+ public function testErrorJob(): void
+ {
+ $jobs = $this->makeJobs();
+
+ $id = $jobs->push($this->errorJob, ['data' => 100]);
+ $this->assertNotEmpty($id);
+ }
+
+ public function testDelayJob(): void
+ {
+ $jobs = $this->makeJobs();
+
+ $id = $jobs->push($this->job, ['data' => 100], Options::delayed(1));
+
+ $this->assertNotEmpty($id);
+
+ $this->assertTrue($this->waitForJob() > 1);
+ $this->assertFileExists($this->job::JOB_FILE);
+
+ $data = json_decode(file_get_contents($this->job::JOB_FILE), true);
+ $this->assertSame($id, $data['id']);
+ $this->assertSame(100, $data['data']);
+ }
+
+ /**
+ * @expectedException \Spiral\Jobs\Exception\JobException
+ */
+ public function testConnectionException(): void
+ {
+ $jobs = new Queue(
+ new RPC(new SocketRelay('localhost', 6002)),
+ new ContainerRegistry(new Container())
+ );
+
+ $jobs->push($this->job, ['data' => 100]);
+ }
+
+ public function makeJobs(): Queue
+ {
+ return new Queue(
+ new RPC(new SocketRelay('localhost', 6001)),
+ new ContainerRegistry(new Container())
+ );
+ }
+
+ private function waitForJob(): float
+ {
+ $start = microtime(true);
+ $try = 0;
+ while (!file_exists($this->job::JOB_FILE) && $try < 10) {
+ usleep(250000);
+ $try++;
+ }
+
+ return microtime(true) - $start;
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/Beanstalk/BrokerTest.php b/plugins/jobs/tests/Jobs/Beanstalk/BrokerTest.php
new file mode 100644
index 00000000..d1ea4682
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Beanstalk/BrokerTest.php
@@ -0,0 +1,20 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Beanstalk;
+
+use Spiral\Jobs\Tests\BaseTest;
+
+class BrokerTest extends BaseTest
+{
+ public const JOB = Job::class;
+ public const ERROR_JOB = ErrorJob::class;
+}
diff --git a/plugins/jobs/tests/Jobs/Beanstalk/ErrorJob.php b/plugins/jobs/tests/Jobs/Beanstalk/ErrorJob.php
new file mode 100644
index 00000000..c4349871
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Beanstalk/ErrorJob.php
@@ -0,0 +1,22 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Beanstalk;
+
+use Spiral\Jobs\JobHandler;
+
+class ErrorJob extends JobHandler
+{
+ public function invoke(string $id): void
+ {
+ throw new \Error('something is wrong');
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/Beanstalk/Job.php b/plugins/jobs/tests/Jobs/Beanstalk/Job.php
new file mode 100644
index 00000000..f8bd541a
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Beanstalk/Job.php
@@ -0,0 +1,26 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Beanstalk;
+
+use Spiral\Jobs\JobHandler;
+
+class Job extends JobHandler
+{
+ public const JOB_FILE = __DIR__ . '/../../local.job';
+
+ public function invoke(string $id, array $payload): void
+ {
+ file_put_contents(self::JOB_FILE, json_encode(
+ $payload + compact('id')
+ ));
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/Local/BrokerTest.php b/plugins/jobs/tests/Jobs/Local/BrokerTest.php
new file mode 100644
index 00000000..9ba83de6
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Local/BrokerTest.php
@@ -0,0 +1,20 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Local;
+
+use Spiral\Jobs\Tests\BaseTest;
+
+class BrokerTest extends BaseTest
+{
+ public const JOB = Job::class;
+ public const ERROR_JOB = ErrorJob::class;
+}
diff --git a/plugins/jobs/tests/Jobs/Local/ErrorJob.php b/plugins/jobs/tests/Jobs/Local/ErrorJob.php
new file mode 100644
index 00000000..70b1365b
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Local/ErrorJob.php
@@ -0,0 +1,22 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Local;
+
+use Spiral\Jobs\JobHandler;
+
+class ErrorJob extends JobHandler
+{
+ public function invoke(string $id): void
+ {
+ throw new \Error('something is wrong');
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/Local/Job.php b/plugins/jobs/tests/Jobs/Local/Job.php
new file mode 100644
index 00000000..2f5803c8
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Local/Job.php
@@ -0,0 +1,26 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Local;
+
+use Spiral\Jobs\JobHandler;
+
+class Job extends JobHandler
+{
+ public const JOB_FILE = __DIR__ . '/../../local.job';
+
+ public function invoke(string $id, array $payload): void
+ {
+ file_put_contents(self::JOB_FILE, json_encode(
+ $payload + compact('id')
+ ));
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/OptionsTest.php b/plugins/jobs/tests/Jobs/OptionsTest.php
new file mode 100644
index 00000000..5d00794e
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/OptionsTest.php
@@ -0,0 +1,34 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests;
+
+use PHPUnit\Framework\TestCase;
+use Spiral\Jobs\Options;
+
+class OptionsTest extends TestCase
+{
+ public function testDelay(): void
+ {
+ $o = new Options();
+ $this->assertNull($o->getDelay());
+ $o = $o->withDelay(10);
+ $this->assertSame(10, $o->getDelay());
+ }
+
+ public function testPipeline(): void
+ {
+ $o = new Options();
+ $this->assertNull($o->getPipeline());
+ $o = $o->withPipeline('custom');
+ $this->assertSame('custom', $o->getPipeline());
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/RegistryTest.php b/plugins/jobs/tests/Jobs/RegistryTest.php
new file mode 100644
index 00000000..7abd75f7
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/RegistryTest.php
@@ -0,0 +1,43 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests;
+
+use PHPUnit\Framework\TestCase;
+use Spiral\Core\Container;
+use Spiral\Jobs\Registry\ContainerRegistry;
+use Spiral\Jobs\Tests\Local\Job;
+
+class RegistryTest extends TestCase
+{
+ public function testMakeJob(): void
+ {
+ $factory = new ContainerRegistry(new Container());
+
+ $j = $factory->getHandler('spiral.jobs.tests.local.job');
+ $this->assertInstanceOf(Job::class, $j);
+
+ $this->assertSame(json_encode(['data' => 200]), $j->serialize(
+ 'spiral.jobs.tests.local.job',
+ ['data' => 200]
+ ));
+ }
+
+ /**
+ * @expectedException \Spiral\Jobs\Exception\JobException
+ */
+ public function testMakeUndefined(): void
+ {
+ $factory = new ContainerRegistry(new Container());
+
+ $factory->getHandler('spiral.jobs.undefined');
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/ShortCircuitTest.php b/plugins/jobs/tests/Jobs/ShortCircuitTest.php
new file mode 100644
index 00000000..c3306385
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/ShortCircuitTest.php
@@ -0,0 +1,90 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests;
+
+use PHPUnit\Framework\TestCase;
+use Spiral\Core\Container;
+use Spiral\Jobs\Options;
+use Spiral\Jobs\Registry\ContainerRegistry;
+use Spiral\Jobs\ShortCircuit;
+use Spiral\Jobs\Tests\Local\ErrorJob;
+use Spiral\Jobs\Tests\Local\Job;
+
+class ShortCircuitTest extends TestCase
+{
+ protected function tearDown(): void
+ {
+ if (file_exists(Job::JOB_FILE)) {
+ unlink(Job::JOB_FILE);
+ }
+ }
+
+ public function testLocal(): void
+ {
+ $c = new ContainerRegistry(new Container());
+ $jobs = new ShortCircuit($c, $c);
+
+ $id = $jobs->push(Job::class, ['data' => 100]);
+
+ $this->assertNotEmpty($id);
+
+ $this->assertFileExists(Job::JOB_FILE);
+
+ $data = json_decode(file_get_contents(Job::JOB_FILE), true);
+ $this->assertSame($id, $data['id']);
+ $this->assertSame(100, $data['data']);
+ }
+
+ public function testLocalDelayed(): void
+ {
+ $c = new ContainerRegistry(new Container());
+ $jobs = new ShortCircuit($c, $c);
+
+ $t = microtime(true);
+ $id = $jobs->push(Job::class, ['data' => 100], Options::delayed(1));
+
+ $this->assertTrue(microtime(true) - $t >= 1);
+
+ $this->assertNotEmpty($id);
+
+ $this->assertFileExists(Job::JOB_FILE);
+
+ $data = json_decode(file_get_contents(Job::JOB_FILE), true);
+ $this->assertSame($id, $data['id']);
+ $this->assertSame(100, $data['data']);
+ }
+
+ /**
+ * @expectedException \Spiral\Jobs\Exception\JobException
+ */
+ public function testError(): void
+ {
+ $c = new ContainerRegistry(new Container());
+ $jobs = new ShortCircuit($c, $c);
+ $jobs->push(ErrorJob::class);
+ }
+
+ public function testLocalDelay(): void
+ {
+ $c = new ContainerRegistry(new Container());
+ $jobs = new ShortCircuit($c, $c);
+
+ $id = $jobs->push(Job::class, ['data' => 100], Options::delayed(1));
+ $this->assertNotEmpty($id);
+
+ $this->assertFileExists(Job::JOB_FILE);
+
+ $data = json_decode(file_get_contents(Job::JOB_FILE), true);
+ $this->assertSame($id, $data['id']);
+ $this->assertSame(100, $data['data']);
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/Sqs/BrokerTest.php b/plugins/jobs/tests/Jobs/Sqs/BrokerTest.php
new file mode 100644
index 00000000..ccaa96de
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Sqs/BrokerTest.php
@@ -0,0 +1,20 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Sqs;
+
+use Spiral\Jobs\Tests\BaseTest;
+
+class BrokerTest extends BaseTest
+{
+ public const JOB = Job::class;
+ public const ERROR_JOB = ErrorJob::class;
+}
diff --git a/plugins/jobs/tests/Jobs/Sqs/ErrorJob.php b/plugins/jobs/tests/Jobs/Sqs/ErrorJob.php
new file mode 100644
index 00000000..738b9f2b
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Sqs/ErrorJob.php
@@ -0,0 +1,22 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Sqs;
+
+use Spiral\Jobs\JobHandler;
+
+class ErrorJob extends JobHandler
+{
+ public function invoke(string $id): void
+ {
+ throw new \Error('something is wrong');
+ }
+}
diff --git a/plugins/jobs/tests/Jobs/Sqs/Job.php b/plugins/jobs/tests/Jobs/Sqs/Job.php
new file mode 100644
index 00000000..e22483a8
--- /dev/null
+++ b/plugins/jobs/tests/Jobs/Sqs/Job.php
@@ -0,0 +1,26 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Jobs\Tests\Sqs;
+
+use Spiral\Jobs\JobHandler;
+
+class Job extends JobHandler
+{
+ public const JOB_FILE = __DIR__ . '/../../local.job';
+
+ public function invoke(string $id, array $payload): void
+ {
+ file_put_contents(self::JOB_FILE, json_encode(
+ $payload + compact('id')
+ ));
+ }
+}
diff --git a/plugins/jobs/tests/bootstrap.php b/plugins/jobs/tests/bootstrap.php
new file mode 100644
index 00000000..b25fdc9d
--- /dev/null
+++ b/plugins/jobs/tests/bootstrap.php
@@ -0,0 +1,16 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+error_reporting(E_ALL | E_STRICT);
+ini_set('display_errors', 'stderr');
+
+//Composer
+require dirname(__DIR__) . '/vendor_php/autoload.php';
diff --git a/plugins/jobs/tests/consumer.php b/plugins/jobs/tests/consumer.php
new file mode 100644
index 00000000..ed56edff
--- /dev/null
+++ b/plugins/jobs/tests/consumer.php
@@ -0,0 +1,22 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+use Spiral\Core\Container;
+use Spiral\Goridge;
+use Spiral\Jobs;
+use Spiral\RoadRunner;
+
+require 'bootstrap.php';
+
+$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+
+$consumer = new Jobs\Consumer(new Jobs\Registry\ContainerRegistry(new Container()));
+$consumer->serve($rr);
diff --git a/plugins/jobs/tests/docker-compose.yml b/plugins/jobs/tests/docker-compose.yml
new file mode 100644
index 00000000..7b88c9cf
--- /dev/null
+++ b/plugins/jobs/tests/docker-compose.yml
@@ -0,0 +1,22 @@
+version: "3"
+
+services:
+ beanstalk:
+ image: schickling/beanstalkd
+ ports:
+ - "11300:11300"
+
+ sqs:
+ image: vsouza/sqs-local
+ ports:
+ - "9324:9324"
+
+ rabbitmq:
+ image: rabbitmq:3-management
+ environment:
+ RABBITMQ_DEFAULT_USER: guest
+ RABBITMQ_DEFAULT_PASS: guest
+ RABBITMQ_DEFAULT_VHOST: /
+ ports:
+ - "15672:15672"
+ - "5672:5672" \ No newline at end of file