diff options
78 files changed, 10760 insertions, 0 deletions
diff --git a/plugins/jobs/broker.go b/plugins/jobs/broker.go new file mode 100644 index 00000000..0066a4f1 --- /dev/null +++ b/plugins/jobs/broker.go @@ -0,0 +1,47 @@ +package jobs + +// Broker manages set of pipelines and provides ability to push jobs into them. +type Broker interface { + // Register broker pipeline. + Register(pipe *Pipeline) error + + // Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before + // the service is started! + Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error + + // Push job into the worker. + Push(pipe *Pipeline, j *Job) (string, error) + + // Stat must fetch statistics about given pipeline or return error. + Stat(pipe *Pipeline) (stat *Stat, err error) +} + +// EventProvider defines the ability to throw events for the broker. +type EventProvider interface { + // Listen attaches the even listener. + Listen(lsn func(event int, ctx interface{})) +} + +// Stat contains information about pipeline. +type Stat struct { + // Pipeline name. + Pipeline string + + // Broken is name of associated broker. + Broker string + + // InternalName defines internal broker specific pipeline name. + InternalName string + + // Consuming indicates that pipeline is pipelines jobs. + Consuming bool + + // testQueue defines number of pending jobs. + Queue int64 + + // Active defines number of jobs which are currently being processed. + Active int64 + + // Delayed defines number of jobs which are being processed. + Delayed int64 +} diff --git a/plugins/jobs/broker/amqp/broker.go b/plugins/jobs/broker/amqp/broker.go new file mode 100644 index 00000000..b47d83ee --- /dev/null +++ b/plugins/jobs/broker/amqp/broker.go @@ -0,0 +1,216 @@ +package amqp + +import ( + "fmt" + "github.com/gofrs/uuid" + "github.com/spiral/jobs/v2" + "sync" + "sync/atomic" +) + +// Broker represents AMQP broker. +type Broker struct { + cfg *Config + lsn func(event int, ctx interface{}) + publish *chanPool + consume *chanPool + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures AMQP job broker (always 2 connections). +func (b *Broker) Init(cfg *Config) (ok bool, err error) { + b.cfg = cfg + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + q, err := newQueue(pipe, b.throw) + if err != nil { + return err + } + + b.queues[pipe] = q + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { + b.mu.Unlock() + return err + } + defer b.publish.Close() + + if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { + b.mu.Unlock() + return err + } + defer b.consume.Close() + + for _, q := range b.queues { + err := q.declare(b.publish, q.name, q.key, nil) + if err != nil { + b.mu.Unlock() + return err + } + } + + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve(b.publish, b.consume) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, q := range b.queues { + q.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.publish != nil && q.execPool != nil { + if q.execPool != nil { + go q.serve(b.publish, b.consume) + } + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil { + return "", err + } + + return id.String(), nil +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + queue, err := q.inspect(b.publish) + if err != nil { + return nil, err + } + + // this the closest approximation we can get for now + return &jobs.Stat{ + InternalName: queue.Name, + Queue: int64(queue.Messages), + Active: int64(atomic.LoadInt32(&q.running)), + }, nil +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/broker/amqp/broker_test.go b/plugins/jobs/broker/amqp/broker_test.go new file mode 100644 index 00000000..66078099 --- /dev/null +++ b/plugins/jobs/broker/amqp/broker_test.go @@ -0,0 +1,419 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "amqp", + "name": "default", + "queue": "rr-queue", + "exchange": "rr-exchange", + "prefetch": 1, + } + + cfg = &Config{ + Addr: "amqp://guest:guest@localhost:5672/", + } +) + +var ( + fanoutPipe = &jobs.Pipeline{ + "broker": "amqp", + "name": "fanout", + "queue": "fanout-queue", + "exchange": "fanout-exchange", + "exchange-type": "fanout", + "prefetch": 1, + } + + fanoutCfg = &Config{ + Addr: "amqp://guest:guest@localhost:5672/", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_BadPipeline(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "amqp", + "name": "default", + "exchange": "rr-exchange", + "prefetch": 1, + })) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_CantStart(t *testing.T) { + b := &Broker{} + _, err := b.Init(&Config{ + Addr: "amqp://guest:guest@localhost:15672/", + }) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Serve()) +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal() + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_Queue_RoutingKey(t *testing.T) { + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key") +} + +func TestBroker_Register_With_RoutingKey(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + assert.NoError(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Consume_With_RoutingKey(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + err = b.Register(&pipeWithKey) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(&pipeWithKey, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Queue_ExchangeType(t *testing.T) { + pipeWithKey := pipe.With("exchange-type", "direct") + + assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct") +} + +func TestBroker_Register_With_ExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("exchange-type", "fanout") + + assert.NoError(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Register_With_WrongExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("exchange-type", "xxx") + + assert.Error(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Consume_With_ExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(fanoutCfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := fanoutPipe.With("exchange-type", "fanout") + + err = b.Register(&pipeWithKey) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(&pipeWithKey, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} diff --git a/plugins/jobs/broker/amqp/config.go b/plugins/jobs/broker/amqp/config.go new file mode 100644 index 00000000..0ed3a50e --- /dev/null +++ b/plugins/jobs/broker/amqp/config.go @@ -0,0 +1,39 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/roadrunner/service" + "time" +) + +// Config defines sqs broker configuration. +type Config struct { + // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/). + Addr string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Addr == "" { + return fmt.Errorf("AMQP address is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to redial +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} diff --git a/plugins/jobs/broker/amqp/config_test.go b/plugins/jobs/broker/amqp/config_test.go new file mode 100644 index 00000000..1abbb55d --- /dev/null +++ b/plugins/jobs/broker/amqp/config_test.go @@ -0,0 +1,27 @@ +package amqp + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"addr":""}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} diff --git a/plugins/jobs/broker/amqp/conn.go b/plugins/jobs/broker/amqp/conn.go new file mode 100644 index 00000000..be747776 --- /dev/null +++ b/plugins/jobs/broker/amqp/conn.go @@ -0,0 +1,232 @@ +package amqp + +import ( + "fmt" + "github.com/cenkalti/backoff/v4" + "github.com/streadway/amqp" + "sync" + "time" +) + +// manages set of AMQP channels +type chanPool struct { + // timeout to backoff redial + tout time.Duration + url string + + mu *sync.Mutex + + conn *amqp.Connection + channels map[string]*channel + wait chan interface{} + connected chan interface{} +} + +// manages single channel +type channel struct { + ch *amqp.Channel + // todo unused + //consumer string + confirm chan amqp.Confirmation + signal chan error +} + +// newConn creates new watched AMQP connection +func newConn(url string, tout time.Duration) (*chanPool, error) { + conn, err := dial(url) + if err != nil { + return nil, err + } + + cp := &chanPool{ + url: url, + tout: tout, + conn: conn, + mu: &sync.Mutex{}, + channels: make(map[string]*channel), + wait: make(chan interface{}), + connected: make(chan interface{}), + } + + close(cp.connected) + go cp.watch() + return cp, nil +} + +// dial dials to AMQP. +func dial(url string) (*amqp.Connection, error) { + return amqp.Dial(url) +} + +// Close gracefully closes all underlying channels and connection. +func (cp *chanPool) Close() error { + cp.mu.Lock() + + close(cp.wait) + if cp.channels == nil { + return fmt.Errorf("connection is dead") + } + + // close all channels and consume + var wg sync.WaitGroup + for _, ch := range cp.channels { + wg.Add(1) + + go func(ch *channel) { + defer wg.Done() + cp.closeChan(ch, nil) + }(ch) + } + cp.mu.Unlock() + + wg.Wait() + + cp.mu.Lock() + defer cp.mu.Unlock() + + if cp.conn != nil { + return cp.conn.Close() + } + + return nil +} + +// waitConnected waits till connection is connected again or eventually closed. +// must only be invoked after connection error has been delivered to channel.signal. +func (cp *chanPool) waitConnected() chan interface{} { + cp.mu.Lock() + defer cp.mu.Unlock() + + return cp.connected +} + +// watch manages connection state and reconnects if needed +func (cp *chanPool) watch() { + for { + select { + case <-cp.wait: + // connection has been closed + return + // here we are waiting for the errors from amqp connection + case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)): + cp.mu.Lock() + // clear connected, since connections are dead + cp.connected = make(chan interface{}) + + // broadcast error to all consume to let them for the tryReconnect + for _, ch := range cp.channels { + ch.signal <- err + } + + // disable channel allocation while server is dead + cp.conn = nil + cp.channels = nil + + // initialize the backoff + expb := backoff.NewExponentialBackOff() + expb.MaxInterval = cp.tout + cp.mu.Unlock() + + // reconnect function + reconnect := func() error { + cp.mu.Lock() + conn, err := dial(cp.url) + if err != nil { + // still failing + fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error())) + cp.mu.Unlock() + return err + } + + // TODO ADD LOGGING + fmt.Println("------amqp successfully redialed------") + + // here we are reconnected + // replace the connection + cp.conn = conn + // re-init the channels + cp.channels = make(map[string]*channel) + cp.mu.Unlock() + return nil + } + + // start backoff retry + errb := backoff.Retry(reconnect, expb) + if errb != nil { + fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error())) + // reconnection failed + close(cp.connected) + return + } + close(cp.connected) + } + } +} + +// channel allocates new channel on amqp connection +func (cp *chanPool) channel(name string) (*channel, error) { + cp.mu.Lock() + dead := cp.conn == nil + cp.mu.Unlock() + + if dead { + // wait for connection restoration (doubled the timeout duration) + select { + case <-time.NewTimer(cp.tout * 2).C: + return nil, fmt.Errorf("connection is dead") + case <-cp.connected: + // connected + } + } + + cp.mu.Lock() + defer cp.mu.Unlock() + + if cp.conn == nil { + return nil, fmt.Errorf("connection has been closed") + } + + if ch, ok := cp.channels[name]; ok { + return ch, nil + } + + // we must create new channel + ch, err := cp.conn.Channel() + if err != nil { + return nil, err + } + + // Enable publish confirmations + if err = ch.Confirm(false); err != nil { + return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err) + } + + // we expect that every allocated channel would have listener on signal + // this is not true only in case of pure producing channels + cp.channels[name] = &channel{ + ch: ch, + confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)), + signal: make(chan error, 1), + } + + return cp.channels[name], nil +} + +// closeChan gracefully closes and removes channel allocation. +func (cp *chanPool) closeChan(c *channel, err error) error { + cp.mu.Lock() + defer cp.mu.Unlock() + + go func() { + c.signal <- nil + c.ch.Close() + }() + + for name, ch := range cp.channels { + if ch == c { + delete(cp.channels, name) + } + } + + return err +} diff --git a/plugins/jobs/broker/amqp/consume_test.go b/plugins/jobs/broker/amqp/consume_test.go new file mode 100644 index 00000000..28999c36 --- /dev/null +++ b/plugins/jobs/broker/amqp/consume_test.go @@ -0,0 +1,258 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + start := time.Now() + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed >= time.Second) + assert.True(t, elapsed < 3*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/broker/amqp/durability_test.go b/plugins/jobs/broker/amqp/durability_test.go new file mode 100644 index 00000000..00d62c51 --- /dev/null +++ b/plugins/jobs/broker/amqp/durability_test.go @@ -0,0 +1,728 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Addr: "amqp://guest:guest@localhost:5673/", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:5673", + upstream: "localhost:5672", + accept: true, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + // expect 2 connections + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NotEqual(t, "0", jid) + + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + proxy.reset(true) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2_2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // start when connection is dead + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + proxy.reset(false) + + _, serr := b.Stat(pipe) + assert.Error(t, serr) + + proxy.reset(true) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2) + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + + if j.Payload == "kill" && len(done) == 0 { + proxy.reset(true) + } + + mu.Lock() + defer mu.Unlock() + done[id] = true + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 3 { + break + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(2).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/broker/amqp/job.go b/plugins/jobs/broker/amqp/job.go new file mode 100644 index 00000000..bd559715 --- /dev/null +++ b/plugins/jobs/broker/amqp/job.go @@ -0,0 +1,56 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/streadway/amqp" +) + +// pack job metadata into headers +func pack(id string, attempt int, j *jobs.Job) amqp.Table { + return amqp.Table{ + "rr-id": id, + "rr-job": j.Job, + "rr-attempt": int64(attempt), + "rr-maxAttempts": int64(j.Options.Attempts), + "rr-timeout": int64(j.Options.Timeout), + "rr-delay": int64(j.Options.Delay), + "rr-retryDelay": int64(j.Options.RetryDelay), + } +} + +// unpack restores jobs.Options +func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) { + j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}} + + if _, ok := d.Headers["rr-id"].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id") + } + + if _, ok := d.Headers["rr-attempt"].(int64); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt") + } + + if _, ok := d.Headers["rr-job"].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job") + } + j.Job = d.Headers["rr-job"].(string) + + if _, ok := d.Headers["rr-maxAttempts"].(int64); ok { + j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64)) + } + + if _, ok := d.Headers["rr-timeout"].(int64); ok { + j.Options.Timeout = int(d.Headers["rr-timeout"].(int64)) + } + + if _, ok := d.Headers["rr-delay"].(int64); ok { + j.Options.Delay = int(d.Headers["rr-delay"].(int64)) + } + + if _, ok := d.Headers["rr-retryDelay"].(int64); ok { + j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64)) + } + + return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil +} diff --git a/plugins/jobs/broker/amqp/job_test.go b/plugins/jobs/broker/amqp/job_test.go new file mode 100644 index 00000000..24ca453b --- /dev/null +++ b/plugins/jobs/broker/amqp/job_test.go @@ -0,0 +1,29 @@ +package amqp + +import ( + "github.com/streadway/amqp" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Unpack_Errors(t *testing.T) { + _, _, _, err := unpack(amqp.Delivery{ + Headers: map[string]interface{}{}, + }) + assert.Error(t, err) + + _, _, _, err = unpack(amqp.Delivery{ + Headers: map[string]interface{}{ + "rr-id": "id", + }, + }) + assert.Error(t, err) + + _, _, _, err = unpack(amqp.Delivery{ + Headers: map[string]interface{}{ + "rr-id": "id", + "rr-attempt": int64(0), + }, + }) + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/amqp/queue.go b/plugins/jobs/broker/amqp/queue.go new file mode 100644 index 00000000..6ef5f20f --- /dev/null +++ b/plugins/jobs/broker/amqp/queue.go @@ -0,0 +1,302 @@ +package amqp + +import ( + "errors" + "fmt" + "github.com/spiral/jobs/v2" + "github.com/streadway/amqp" + "os" + "sync" + "sync/atomic" + "time" +) + +type ExchangeType string + +const ( + Direct ExchangeType = "direct" + Fanout ExchangeType = "fanout" + Topic ExchangeType = "topic" + Headers ExchangeType = "headers" +) + +func (et ExchangeType) IsValid() error { + switch et { + case Direct, Fanout, Topic, Headers: + return nil + } + return errors.New("unknown exchange-type") +} + +func (et ExchangeType) String() string { + switch et { + case Direct, Fanout, Topic, Headers: + return string(et) + default: + return "direct" + } +} + + +type queue struct { + active int32 + pipe *jobs.Pipeline + exchange string + exchangeType ExchangeType + name, key string + consumer string + + // active consuming channel + muc sync.Mutex + cc *channel + + // queue events + lsn func(event int, ctx interface{}) + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + running int32 + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +// newQueue creates new queue wrapper for AMQP. +func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { + if pipe.String("queue", "") == "" { + return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline") + } + + exchangeType := ExchangeType(pipe.String("exchange-type", "direct")) + + err := exchangeType.IsValid() + if err != nil { + return nil, fmt.Errorf(err.Error()) + } + + return &queue{ + exchange: pipe.String("exchange", "amqp.direct"), + exchangeType: exchangeType, + name: pipe.String("queue", ""), + key: pipe.String("routing-key", pipe.String("queue", "")), + consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())), + pipe: pipe, + lsn: lsn, + }, nil +} + +// serve consumes queue +func (q *queue) serve(publish, consume *chanPool) { + atomic.StoreInt32(&q.active, 1) + + for { + <-consume.waitConnected() + if atomic.LoadInt32(&q.active) == 0 { + // stopped + return + } + + delivery, cc, err := q.consume(consume) + if err != nil { + q.report(err) + continue + } + + q.muc.Lock() + q.cc = cc + q.muc.Unlock() + + for d := range delivery { + q.muw.Lock() + q.wg.Add(1) + q.muw.Unlock() + + atomic.AddInt32(&q.running, 1) + h := <-q.execPool + + go func(h jobs.Handler, d amqp.Delivery) { + err := q.do(publish, h, d) + + atomic.AddInt32(&q.running, ^int32(0)) + q.execPool <- h + q.wg.Done() + q.report(err) + }(h, d) + } + } +} + +func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) { + // allocate channel for the consuming + if cc, err = consume.channel(q.name); err != nil { + return nil, nil, err + } + + if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil { + return nil, nil, consume.closeChan(cc, err) + } + + delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil) + if err != nil { + return nil, nil, consume.closeChan(cc, err) + } + + // do i like it? + go func(consume *chanPool) { + for err := range cc.signal { + consume.closeChan(cc, err) + return + } + }(consume) + + return delivery, cc, err +} + +func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error { + id, attempt, j, err := unpack(d) + if err != nil { + q.report(err) + return d.Nack(false, false) + } + err = h(id, j) + + if err == nil { + return d.Ack(false) + } + + // failed + q.errHandler(id, j, err) + + if !j.Options.CanRetry(attempt) { + return d.Nack(false, false) + } + + // retry as new j (to accommodate attempt number and new delay) + if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil { + q.report(err) + return d.Nack(false, true) + } + + return d.Ack(false) +} + +func (q *queue) stop() { + if atomic.LoadInt32(&q.active) == 0 { + return + } + + atomic.StoreInt32(&q.active, 0) + + q.muc.Lock() + if q.cc != nil { + // gracefully stopped consuming + q.report(q.cc.ch.Cancel(q.consumer, true)) + } + q.muc.Unlock() + + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() +} + +// publish message to queue or to delayed queue. +func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error { + c, err := cp.channel(q.name) + if err != nil { + return err + } + + qKey := q.key + + if delay != 0 { + delayMs := int64(delay.Seconds() * 1000) + qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name) + qKey = qName + + err := q.declare(cp, qName, qName, amqp.Table{ + "x-dead-letter-exchange": q.exchange, + "x-dead-letter-routing-key": q.name, + "x-message-ttl": delayMs, + "x-expires": delayMs * 2, + }) + + if err != nil { + return err + } + } + + err = c.ch.Publish( + q.exchange, // exchange + qKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/octet-stream", + Body: j.Body(), + DeliveryMode: amqp.Persistent, + Headers: pack(id, attempt, j), + }, + ) + + if err != nil { + return cp.closeChan(c, err) + } + + confirmed, ok := <-c.confirm + if ok && confirmed.Ack { + return nil + } + + return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag) +} + +// declare queue and binding to it +func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error { + c, err := cp.channel(q.name) + if err != nil { + return err + } + + err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil) + if err != nil { + return cp.closeChan(c, err) + } + + _, err = c.ch.QueueDeclare(queue, true, false, false, false, args) + if err != nil { + return cp.closeChan(c, err) + } + + err = c.ch.QueueBind(queue, key, q.exchange, false, nil) + if err != nil { + return cp.closeChan(c, err) + } + + // keep channel open + return err +} + +// inspect the queue +func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) { + c, err := cp.channel("stat") + if err != nil { + return nil, err + } + + queue, err := c.ch.QueueInspect(q.name) + if err != nil { + return nil, cp.closeChan(c, err) + } + + // keep channel open + return &queue, err +} + +// throw handles service, server and pool events. +func (q *queue) report(err error) { + if err != nil { + q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/broker/amqp/stat_test.go b/plugins/jobs/broker/amqp/stat_test.go new file mode 100644 index 00000000..ef19746c --- /dev/null +++ b/plugins/jobs/broker/amqp/stat_test.go @@ -0,0 +1,63 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + wg := &sync.WaitGroup{} + wg.Add(1) + exec <- func(id string, j *jobs.Job) error { + defer wg.Done() + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Active) + + return nil + } + + wg.Wait() + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(0), stat.Active) +} diff --git a/plugins/jobs/broker/beanstalk/broker.go b/plugins/jobs/broker/beanstalk/broker.go new file mode 100644 index 00000000..dc3ea518 --- /dev/null +++ b/plugins/jobs/broker/beanstalk/broker.go @@ -0,0 +1,185 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker run consume using Broker service. +type Broker struct { + cfg *Config + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + conn *conn + tubes map[*jobs.Pipeline]*tube +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures broker. +func (b *Broker) Init(cfg *Config) (bool, error) { + b.cfg = cfg + b.tubes = make(map[*jobs.Pipeline]*tube) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.tubes[pipe]; ok { + return fmt.Errorf("tube `%s` has already been registered", pipe.Name()) + } + + t, err := newTube(pipe, b.throw) + if err != nil { + return err + } + + b.tubes[pipe] = t + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + if b.conn, err = b.cfg.newConn(); err != nil { + return err + } + defer b.conn.Close() + + for _, t := range b.tubes { + tt := t + if tt.execPool != nil { + go tt.serve(b.cfg) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, t := range b.tubes { + t.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + t, ok := b.tubes[pipe] + if !ok { + return fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + t.stop() + + t.execPool = execPool + t.errHandler = errHandler + + if b.conn != nil { + tt := t + if tt.execPool != nil { + go tt.serve(connFactory(b.cfg)) + } + } + + return nil +} + +// Push data into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + t := b.tube(pipe) + if t == nil { + return "", fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + data, err := pack(j) + if err != nil { + return "", err + } + + return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration()) +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + t := b.tube(pipe) + if t == nil { + return nil, fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + return t.stat(b.conn) +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) tube(pipe *jobs.Pipeline) *tube { + b.mu.Lock() + defer b.mu.Unlock() + + t, ok := b.tubes[pipe] + if !ok { + return nil + } + + return t +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/broker/beanstalk/broker_test.go b/plugins/jobs/broker/beanstalk/broker_test.go new file mode 100644 index 00000000..cd2132af --- /dev/null +++ b/plugins/jobs/broker/beanstalk/broker_test.go @@ -0,0 +1,276 @@ +package beanstalk + +import ( + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "beanstalk", + "name": "default", + "tube": "test", + } + + cfg = &Config{ + Addr: "tcp://localhost:11300", + } +) + +func init() { + conn, err := beanstalk.Dial("tcp", "localhost:11300") + if err != nil { + panic(err) + } + defer conn.Close() + + t := beanstalk.Tube{Name: "testTube", Conn: conn} + + for { + id, _, err := t.PeekReady() + if id == 0 || err != nil { + break + } + + if err := conn.Delete(id); err != nil { + panic(err) + } + } +} + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Register_Invalid(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "beanstalk", + "name": "default", + })) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Error(t *testing.T) { + b := &Broker{} + _, err := b.Init(&Config{ + Addr: "tcp://localhost:11399", + }) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Serve()) +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/beanstalk/config.go b/plugins/jobs/broker/beanstalk/config.go new file mode 100644 index 00000000..3e48a2d7 --- /dev/null +++ b/plugins/jobs/broker/beanstalk/config.go @@ -0,0 +1,50 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/roadrunner/service" + "strings" + "time" +) + +// Config defines beanstalk broker configuration. +type Config struct { + // Addr of beanstalk server. + Addr string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Addr == "" { + return fmt.Errorf("beanstalk address is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to allocate the connection. +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} + +// size creates new rpc socket Listener. +func (c *Config) newConn() (*conn, error) { + dsn := strings.Split(c.Addr, "://") + if len(dsn) != 2 { + return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)") + } + + return newConn(dsn[0], dsn[1], c.TimeoutDuration()) +} diff --git a/plugins/jobs/broker/beanstalk/config_test.go b/plugins/jobs/broker/beanstalk/config_test.go new file mode 100644 index 00000000..4ba08a04 --- /dev/null +++ b/plugins/jobs/broker/beanstalk/config_test.go @@ -0,0 +1,47 @@ +package beanstalk + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func TestConfig_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func TestConfig_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"addr":""}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func TestConfig_Hydrate_Error3(t *testing.T) { + cfg := &mockCfg{`{"addr":"tcp"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + _, err := c.newConn() + assert.Error(t, err) +} + +func TestConfig_Hydrate_Error4(t *testing.T) { + cfg := &mockCfg{`{"addr":"unix://sock.bean"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + _, err := c.newConn() + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/beanstalk/conn.go b/plugins/jobs/broker/beanstalk/conn.go new file mode 100644 index 00000000..7aba6bbb --- /dev/null +++ b/plugins/jobs/broker/beanstalk/conn.go @@ -0,0 +1,180 @@ +package beanstalk + +import ( + "fmt" + "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" + "strings" + "sync" + "time" +) + +var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} + +// creates new connections +type connFactory interface { + newConn() (*conn, error) +} + +// conn protects allocation for one connection between +// threads and provides reconnecting capabilities. +type conn struct { + tout time.Duration + conn *beanstalk.Conn + alive bool + free chan interface{} + dead chan interface{} + stop chan interface{} + lock *sync.Cond +} + +// creates new beanstalk connection and reconnect watcher. +func newConn(network, addr string, tout time.Duration) (cn *conn, err error) { + cn = &conn{ + tout: tout, + alive: true, + free: make(chan interface{}, 1), + dead: make(chan interface{}, 1), + stop: make(chan interface{}), + lock: sync.NewCond(&sync.Mutex{}), + } + + cn.conn, err = beanstalk.Dial(network, addr) + if err != nil { + return nil, err + } + + go cn.watch(network, addr) + + return cn, nil +} + +// reset the connection and reconnect watcher. +func (cn *conn) Close() error { + cn.lock.L.Lock() + defer cn.lock.L.Unlock() + + close(cn.stop) + for cn.alive { + cn.lock.Wait() + } + + return nil +} + +// acquire connection instance or return error in case of timeout. When mandratory set to true +// timeout won't be applied. +func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) { + // do not apply timeout on mandatory connections + if mandatory { + select { + case <-cn.stop: + return nil, fmt.Errorf("connection closed") + case <-cn.free: + return cn.conn, nil + } + } + + select { + case <-cn.stop: + return nil, fmt.Errorf("connection closed") + case <-cn.free: + return cn.conn, nil + default: + // *2 to handle commands called right after the connection reset + tout := time.NewTimer(cn.tout * 2) + select { + case <-cn.stop: + tout.Stop() + return nil, fmt.Errorf("connection closed") + case <-cn.free: + tout.Stop() + return cn.conn, nil + case <-tout.C: + return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout) + } + } +} + +// release acquired connection. +func (cn *conn) release(err error) error { + if isConnError(err) { + // reconnect is required + cn.dead <- err + } else { + cn.free <- nil + } + + return err +} + +// watch and reconnect if dead +func (cn *conn) watch(network, addr string) { + cn.free <- nil + t := time.NewTicker(WatchThrottleLimit) + defer t.Stop() + for { + select { + case <-cn.dead: + // simple throttle limiter + <-t.C + // try to reconnect + // TODO add logging here + expb := backoff.NewExponentialBackOff() + expb.MaxInterval = cn.tout + + reconnect := func() error { + conn, err := beanstalk.Dial(network, addr) + if err != nil { + fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error())) + return err + } + + // TODO ADD LOGGING + fmt.Println("------beanstalk successfully redialed------") + + cn.conn = conn + cn.free <- nil + return nil + } + + err := backoff.Retry(reconnect, expb) + if err != nil { + fmt.Println(fmt.Sprintf("redial failed: %s", err.Error())) + cn.dead <- nil + } + + case <-cn.stop: + cn.lock.L.Lock() + select { + case <-cn.dead: + case <-cn.free: + } + + // stop underlying connection + cn.conn.Close() + cn.alive = false + cn.lock.Signal() + + cn.lock.L.Unlock() + + return + } + } +} + +// isConnError indicates that error is related to dead socket. +func isConnError(err error) bool { + if err == nil { + return false + } + + for _, errStr := range connErrors { + // golang... + if strings.Contains(err.Error(), errStr) { + return true + } + } + + return false +} diff --git a/plugins/jobs/broker/beanstalk/constants.go b/plugins/jobs/broker/beanstalk/constants.go new file mode 100644 index 00000000..84be305e --- /dev/null +++ b/plugins/jobs/broker/beanstalk/constants.go @@ -0,0 +1,6 @@ +package beanstalk + +import "time" + +// WatchThrottleLimit is used to limit reconnection occurrence in watch function +const WatchThrottleLimit = time.Second
\ No newline at end of file diff --git a/plugins/jobs/broker/beanstalk/consume_test.go b/plugins/jobs/broker/beanstalk/consume_test.go new file mode 100644 index 00000000..b16866ae --- /dev/null +++ b/plugins/jobs/broker/beanstalk/consume_test.go @@ -0,0 +1,242 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed >= time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/broker/beanstalk/durability_test.go b/plugins/jobs/broker/beanstalk/durability_test.go new file mode 100644 index 00000000..499a5206 --- /dev/null +++ b/plugins/jobs/broker/beanstalk/durability_test.go @@ -0,0 +1,575 @@ +package beanstalk + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Addr: "tcp://localhost:11301", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:11301", + upstream: "localhost:11300", + accept: true, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + // expect 2 connections + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // reoccuring + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NotEqual(t, "0", jid) + + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + proxy.reset(true) + + // auto-reconnect + _, serr = b.Stat(pipe) + assert.NoError(t, serr) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2) + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(3), st.Queue+st.Active) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + if j.Payload == "kill" { + proxy.reset(true) + } + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(2).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/broker/beanstalk/job.go b/plugins/jobs/broker/beanstalk/job.go new file mode 100644 index 00000000..fd9c8c3c --- /dev/null +++ b/plugins/jobs/broker/beanstalk/job.go @@ -0,0 +1,24 @@ +package beanstalk + +import ( + "bytes" + "encoding/gob" + "github.com/spiral/jobs/v2" +) + +func pack(j *jobs.Job) ([]byte, error) { + b := new(bytes.Buffer) + err := gob.NewEncoder(b).Encode(j) + if err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +func unpack(data []byte) (*jobs.Job, error) { + j := &jobs.Job{} + err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j) + + return j, err +} diff --git a/plugins/jobs/broker/beanstalk/sock.bean b/plugins/jobs/broker/beanstalk/sock.bean new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/plugins/jobs/broker/beanstalk/sock.bean diff --git a/plugins/jobs/broker/beanstalk/stat_test.go b/plugins/jobs/broker/beanstalk/stat_test.go new file mode 100644 index 00000000..14a55859 --- /dev/null +++ b/plugins/jobs/broker/beanstalk/stat_test.go @@ -0,0 +1,66 @@ +package beanstalk + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + // beanstalk reserves job right after push + time.Sleep(time.Millisecond * 100) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(1), stat.Active) + + close(waitJob) + return nil + } + + <-waitJob + + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) +} diff --git a/plugins/jobs/broker/beanstalk/tube.go b/plugins/jobs/broker/beanstalk/tube.go new file mode 100644 index 00000000..9d7ad117 --- /dev/null +++ b/plugins/jobs/broker/beanstalk/tube.go @@ -0,0 +1,250 @@ +package beanstalk + +import ( + "fmt" + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/jobs/v2" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type tube struct { + active int32 + pipe *jobs.Pipeline + mut sync.Mutex + tube *beanstalk.Tube + tubeSet *beanstalk.TubeSet + reserve time.Duration + + // tube events + lsn func(event int, ctx interface{}) + + // stop channel + wait chan interface{} + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +type entry struct { + id uint64 + data []byte +} + +func (e *entry) String() string { + return fmt.Sprintf("%v", e.id) +} + +// create new tube consumer and producer +func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) { + if pipe.String("tube", "") == "" { + return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline") + } + + return &tube{ + pipe: pipe, + tube: &beanstalk.Tube{Name: pipe.String("tube", "")}, + tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")), + reserve: pipe.Duration("reserve", time.Second), + lsn: lsn, + }, nil +} + +// run consumers +func (t *tube) serve(connector connFactory) { + // tube specific consume connection + cn, err := connector.newConn() + if err != nil { + t.report(err) + return + } + defer cn.Close() + + t.wait = make(chan interface{}) + atomic.StoreInt32(&t.active, 1) + + for { + e, err := t.consume(cn) + if err != nil { + if isConnError(err) { + t.report(err) + } + continue + } + + if e == nil { + return + } + + h := <-t.execPool + go func(h jobs.Handler, e *entry) { + err := t.do(cn, h, e) + t.execPool <- h + t.wg.Done() + t.report(err) + }(h, e) + } +} + +// fetch consume +func (t *tube) consume(cn *conn) (*entry, error) { + t.muw.Lock() + defer t.muw.Unlock() + + select { + case <-t.wait: + return nil, nil + default: + conn, err := cn.acquire(false) + if err != nil { + return nil, err + } + + t.tubeSet.Conn = conn + + id, data, err := t.tubeSet.Reserve(t.reserve) + cn.release(err) + + if err != nil { + return nil, err + } + + t.wg.Add(1) + return &entry{id: id, data: data}, nil + } +} + +// do data +func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error { + j, err := unpack(e.data) + if err != nil { + return err + } + + err = h(e.String(), j) + + // mandatory acquisition + conn, connErr := cn.acquire(true) + if connErr != nil { + // possible if server is dead + return connErr + } + + if err == nil { + return cn.release(conn.Delete(e.id)) + } + + stat, statErr := conn.StatsJob(e.id) + if statErr != nil { + return cn.release(statErr) + } + + t.errHandler(e.String(), j, err) + + reserves, ok := strconv.Atoi(stat["reserves"]) + if ok != nil || !j.Options.CanRetry(reserves-1) { + return cn.release(conn.Bury(e.id, 0)) + } + + return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration())) +} + +// stop tube consuming +func (t *tube) stop() { + if atomic.LoadInt32(&t.active) == 0 { + return + } + + atomic.StoreInt32(&t.active, 0) + + close(t.wait) + + t.muw.Lock() + t.wg.Wait() + t.muw.Unlock() +} + +// put data into pool or return error (no wait), this method will try to reattempt operation if +// dead conn found. +func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { + id, err = t.doPut(cn, attempt, data, delay, rrt) + if err != nil && isConnError(err) { + return t.doPut(cn, attempt, data, delay, rrt) + } + + return id, err +} + +// perform put operation +func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { + conn, err := cn.acquire(false) + if err != nil { + return "", err + } + + var bid uint64 + + t.mut.Lock() + t.tube.Conn = conn + bid, err = t.tube.Put(data, 0, delay, rrt) + t.mut.Unlock() + + return strconv.FormatUint(bid, 10), cn.release(err) +} + +// return tube stats (retries) +func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) { + stat, err = t.doStat(cn) + if err != nil && isConnError(err) { + return t.doStat(cn) + } + + return stat, err +} + +// return tube stats +func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) { + conn, err := cn.acquire(false) + if err != nil { + return nil, err + } + + t.mut.Lock() + t.tube.Conn = conn + values, err := t.tube.Stats() + t.mut.Unlock() + + if err != nil { + return nil, cn.release(err) + } + + stat = &jobs.Stat{InternalName: t.tube.Name} + + if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil { + stat.Queue = int64(v) + } + + if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil { + stat.Active = int64(v) + } + + if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil { + stat.Delayed = int64(v) + } + + return stat, cn.release(nil) +} + +// report tube specific error +func (t *tube) report(err error) { + if err != nil { + t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/broker/beanstalk/tube_test.go b/plugins/jobs/broker/beanstalk/tube_test.go new file mode 100644 index 00000000..b6a646f4 --- /dev/null +++ b/plugins/jobs/broker/beanstalk/tube_test.go @@ -0,0 +1,18 @@ +package beanstalk + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestTube_CantServe(t *testing.T) { + var gctx interface{} + tube := &tube{ + lsn: func(event int, ctx interface{}) { + gctx = ctx + }, + } + + tube.serve(&Config{Addr: "broken"}) + assert.Error(t, gctx.(error)) +} diff --git a/plugins/jobs/broker/ephemeral/broker.go b/plugins/jobs/broker/ephemeral/broker.go new file mode 100644 index 00000000..385bb175 --- /dev/null +++ b/plugins/jobs/broker/ephemeral/broker.go @@ -0,0 +1,174 @@ +package ephemeral + +import ( + "fmt" + "github.com/gofrs/uuid" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker run queue using local goroutines. +type Broker struct { + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures broker. +func (b *Broker) Init() (bool, error) { + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0)) + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() error { + // start consuming + b.mu.Lock() + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve() + } + } + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + // stop all consuming + for _, q := range b.queues { + q.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.wait != nil { + if q.execPool != nil { + go q.serve() + } + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + q.push(id.String(), j, 0, j.Options.DelayDuration()) + + return id.String(), nil +} + +// Stat must consume statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + return q.stat(), nil +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/broker/ephemeral/broker_test.go b/plugins/jobs/broker/ephemeral/broker_test.go new file mode 100644 index 00000000..c1b40276 --- /dev/null +++ b/plugins/jobs/broker/ephemeral/broker_test.go @@ -0,0 +1,221 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "local", + "name": "default", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init() + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + b.Init() + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/ephemeral/consume_test.go b/plugins/jobs/broker/ephemeral/consume_test.go new file mode 100644 index 00000000..d764a984 --- /dev/null +++ b/plugins/jobs/broker/ephemeral/consume_test.go @@ -0,0 +1,253 @@ +package ephemeral + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed > time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/broker/ephemeral/queue.go b/plugins/jobs/broker/ephemeral/queue.go new file mode 100644 index 00000000..a24bc216 --- /dev/null +++ b/plugins/jobs/broker/ephemeral/queue.go @@ -0,0 +1,161 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "sync" + "sync/atomic" + "time" +) + +type queue struct { + on int32 + state *jobs.Stat + + // job pipeline + concurPool chan interface{} + jobs chan *entry + + // on operations + muw sync.Mutex + wg sync.WaitGroup + + // stop channel + wait chan interface{} + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +type entry struct { + id string + job *jobs.Job + attempt int +} + +// create new queue +func newQueue(maxConcur int) *queue { + q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)} + + if maxConcur != 0 { + q.concurPool = make(chan interface{}, maxConcur) + for i := 0; i < maxConcur; i++ { + q.concurPool <- nil + } + } + + return q +} + +// serve consumers +func (q *queue) serve() { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.on, 1) + + for { + e := q.consume() + if e == nil { + q.wg.Wait() + return + } + + if q.concurPool != nil { + <-q.concurPool + } + + atomic.AddInt64(&q.state.Active, 1) + h := <-q.execPool + + go func(h jobs.Handler, e *entry) { + defer q.wg.Done() + + q.do(h, e) + atomic.AddInt64(&q.state.Active, ^int64(0)) + + q.execPool <- h + + if q.concurPool != nil { + q.concurPool <- nil + } + }(h, e) + } +} + +// allocate one job entry +func (q *queue) consume() *entry { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil + case e := <-q.jobs: + q.wg.Add(1) + + return e + } +} + +// do singe job +func (q *queue) do(h jobs.Handler, e *entry) { + err := h(e.id, e.job) + + if err == nil { + atomic.AddInt64(&q.state.Queue, ^int64(0)) + return + } + + q.errHandler(e.id, e.job, err) + + if !e.job.Options.CanRetry(e.attempt) { + atomic.AddInt64(&q.state.Queue, ^int64(0)) + return + } + + q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) +} + +// stop the queue consuming +func (q *queue) stop() { + if atomic.LoadInt32(&q.on) == 0 { + return + } + + close(q.wait) + + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() + + atomic.StoreInt32(&q.on, 0) +} + +// add job to the queue +func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) { + if delay == 0 { + atomic.AddInt64(&q.state.Queue, 1) + go func() { + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() + + return + } + + atomic.AddInt64(&q.state.Delayed, 1) + go func() { + time.Sleep(delay) + atomic.AddInt64(&q.state.Delayed, ^int64(0)) + atomic.AddInt64(&q.state.Queue, 1) + + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() +} + +func (q *queue) stat() *jobs.Stat { + return &jobs.Stat{ + InternalName: ":memory:", + Queue: atomic.LoadInt64(&q.state.Queue), + Active: atomic.LoadInt64(&q.state.Active), + Delayed: atomic.LoadInt64(&q.state.Delayed), + } +} diff --git a/plugins/jobs/broker/ephemeral/stat_test.go b/plugins/jobs/broker/ephemeral/stat_test.go new file mode 100644 index 00000000..0894323c --- /dev/null +++ b/plugins/jobs/broker/ephemeral/stat_test.go @@ -0,0 +1,64 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Active) + + close(waitJob) + return nil + } + + <-waitJob + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(0), stat.Active) +} diff --git a/plugins/jobs/broker/sqs/broker.go b/plugins/jobs/broker/sqs/broker.go new file mode 100644 index 00000000..8cc62b6b --- /dev/null +++ b/plugins/jobs/broker/sqs/broker.go @@ -0,0 +1,189 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker represents SQS broker. +type Broker struct { + cfg *Config + sqs *sqs.SQS + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures SQS broker. +func (b *Broker) Init(cfg *Config) (ok bool, err error) { + b.cfg = cfg + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + q, err := newQueue(pipe, b.throw) + if err != nil { + return err + } + + b.queues[pipe] = q + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + b.sqs, err = b.cfg.SQS() + if err != nil { + return err + } + + for _, q := range b.queues { + q.url, err = q.declareQueue(b.sqs) + if err != nil { + return err + } + } + + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve(b.sqs, b.cfg.TimeoutDuration()) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, q := range b.queues { + q.stop() + } + + b.wait <- nil + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.sqs != nil && q.execPool != nil { + go q.serve(b.sqs, b.cfg.TimeoutDuration()) + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + if j.Options.Delay > 900 || j.Options.RetryDelay > 900 { + return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name()) + } + + return q.send(b.sqs, j) +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + return q.stat(b.sqs) +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/broker/sqs/broker_test.go b/plugins/jobs/broker/sqs/broker_test.go new file mode 100644 index 00000000..c87b302d --- /dev/null +++ b/plugins/jobs/broker/sqs/broker_test.go @@ -0,0 +1,275 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + "declare": map[string]interface{}{ + "MessageRetentionPeriod": 86400, + }, + } + + cfg = &Config{ + Key: "api-key", + Secret: "api-secret", + Region: "us-west-1", + Endpoint: "http://localhost:9324", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_RegisterInvalid(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "sqs", + "name": "default", + })) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + b.Consume(pipe, exec, errf) + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) { + pipe := &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "invalid", + "declare": map[string]interface{}{ + "VisibilityTimeout": "invalid", + }, + } + + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + b.Consume(pipe, exec, errf) + + assert.Error(t, b.Serve()) +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/sqs/config.go b/plugins/jobs/broker/sqs/config.go new file mode 100644 index 00000000..d0c2f2b2 --- /dev/null +++ b/plugins/jobs/broker/sqs/config.go @@ -0,0 +1,82 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/roadrunner/service" + "time" +) + +// Config defines sqs broker configuration. +type Config struct { + // Region defined SQS region, not required when endpoint is not empty. + Region string + + // Region defined AWS API key, not required when endpoint is not empty. + Key string + + // Region defined AWS API secret, not required when endpoint is not empty. + Secret string + + // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development. + Endpoint string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Region == "" { + return fmt.Errorf("SQS region is missing") + } + + if c.Key == "" { + return fmt.Errorf("SQS key is missing") + } + + if c.Secret == "" { + return fmt.Errorf("SQS secret is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to allocate the connection. +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} + +// Session returns new AWS session. +func (c *Config) Session() (*session.Session, error) { + return session.NewSession(&aws.Config{ + Region: aws.String(c.Region), + Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""), + }) +} + +// SQS returns new SQS instance or error. +func (c *Config) SQS() (*sqs.SQS, error) { + sess, err := c.Session() + if err != nil { + return nil, err + } + + if c.Endpoint == "" { + return sqs.New(sess), nil + } + + return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil +} diff --git a/plugins/jobs/broker/sqs/config_test.go b/plugins/jobs/broker/sqs/config_test.go new file mode 100644 index 00000000..b36b3c6f --- /dev/null +++ b/plugins/jobs/broker/sqs/config_test.go @@ -0,0 +1,48 @@ +package sqs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error3(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error4(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error5(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} diff --git a/plugins/jobs/broker/sqs/consume_test.go b/plugins/jobs/broker/sqs/consume_test.go new file mode 100644 index 00000000..434fc6ea --- /dev/null +++ b/plugins/jobs/broker/sqs/consume_test.go @@ -0,0 +1,370 @@ +package sqs + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) { + pipe := &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + } + + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_PushTooBigDelay(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{ + Delay: 901, + }, + }) + + assert.Error(t, perr) +} + +func TestBroker_Consume_PushTooBigDelay2(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{ + RetryDelay: 901, + }, + }) + + assert.Error(t, perr) +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed > time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled + b.Stop() +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/broker/sqs/durability_test.go b/plugins/jobs/broker/sqs/durability_test.go new file mode 100644 index 00000000..58ddf4b9 --- /dev/null +++ b/plugins/jobs/broker/sqs/durability_test.go @@ -0,0 +1,588 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Key: "api-key", + Secret: "api-secret", + Region: "us-west-1", + Endpoint: "http://localhost:9325", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:9325", + upstream: "localhost:9324", + accept: true, + } + + proxyPipe = &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + "lockReserved": 1, + "declare": map[string]interface{}{ + "MessageRetentionPeriod": 86400, + }, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + // expect 2 connections + proxy.waitConn(1) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(1) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + proxy.reset(false) + + _, serr = b.Stat(proxyPipe) + assert.Error(t, serr) + + proxy.reset(true) + + _, serr = b.Stat(proxyPipe) + assert.NoError(t, serr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1) + + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(3), st.Queue+st.Active) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + if j.Payload == "kill" { + proxy.reset(true) + } + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 3 { + break + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(1).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/broker/sqs/job.go b/plugins/jobs/broker/sqs/job.go new file mode 100644 index 00000000..50e2c164 --- /dev/null +++ b/plugins/jobs/broker/sqs/job.go @@ -0,0 +1,80 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "strconv" + "time" +) + +var jobAttributes = []*string{ + aws.String("rr-job"), + aws.String("rr-maxAttempts"), + aws.String("rr-delay"), + aws.String("rr-timeout"), + aws.String("rr-retryDelay"), +} + +// pack job metadata into headers +func pack(url *string, j *jobs.Job) *sqs.SendMessageInput { + return &sqs.SendMessageInput{ + QueueUrl: url, + DelaySeconds: aws.Int64(int64(j.Options.Delay)), + MessageBody: aws.String(j.Payload), + MessageAttributes: map[string]*sqs.MessageAttributeValue{ + "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)}, + "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)}, + "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())}, + "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())}, + "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())}, + }, + } +} + +// unpack restores jobs.Options +func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) { + if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok { + return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount") + } + attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"]) + + for _, attr := range jobAttributes { + if _, ok := msg.MessageAttributes[*attr]; !ok { + return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr) + } + } + + j = &jobs.Job{ + Job: *msg.MessageAttributes["rr-job"].StringValue, + Payload: *msg.Body, + Options: &jobs.Options{}, + } + + if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil { + j.Options.Delay = delay + } + + if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil { + j.Options.Attempts = maxAttempts + } + + if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil { + j.Options.Timeout = timeout + } + + if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil { + j.Options.RetryDelay = retryDelay + } + + return *msg.MessageId, attempt - 1, j, nil +} + +func awsString(n int) *string { + return aws.String(strconv.Itoa(n)) +} + +func awsDuration(d time.Duration) *string { + return aws.String(strconv.Itoa(int(d.Seconds()))) +} diff --git a/plugins/jobs/broker/sqs/job_test.go b/plugins/jobs/broker/sqs/job_test.go new file mode 100644 index 00000000..a120af53 --- /dev/null +++ b/plugins/jobs/broker/sqs/job_test.go @@ -0,0 +1,19 @@ +package sqs + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Unpack(t *testing.T) { + msg := &sqs.Message{ + Body: aws.String("body"), + Attributes: map[string]*string{}, + MessageAttributes: map[string]*sqs.MessageAttributeValue{}, + } + + _, _, _, err := unpack(msg) + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/sqs/queue.go b/plugins/jobs/broker/sqs/queue.go new file mode 100644 index 00000000..8a92448e --- /dev/null +++ b/plugins/jobs/broker/sqs/queue.go @@ -0,0 +1,266 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type queue struct { + active int32 + pipe *jobs.Pipeline + url *string + reserve time.Duration + lockReserved time.Duration + + // queue events + lsn func(event int, ctx interface{}) + + // stop channel + wait chan interface{} + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { + if pipe.String("queue", "") == "" { + return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name()) + } + + return &queue{ + pipe: pipe, + reserve: pipe.Duration("reserve", time.Second), + lockReserved: pipe.Duration("lockReserved", 300*time.Second), + lsn: lsn, + }, nil +} + +// declareQueue declared queue +func (q *queue) declareQueue(s *sqs.SQS) (*string, error) { + attr := make(map[string]*string) + for k, v := range q.pipe.Map("declare") { + if vs, ok := v.(string); ok { + attr[k] = aws.String(vs) + } + + if vi, ok := v.(int); ok { + attr[k] = aws.String(strconv.Itoa(vi)) + } + + if vb, ok := v.(bool); ok { + if vb { + attr[k] = aws.String("true") + } else { + attr[k] = aws.String("false") + } + } + } + + if len(attr) != 0 { + r, err := s.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(q.pipe.String("queue", "")), + Attributes: attr, + }) + + return r.QueueUrl, err + } + + // no need to create (get existed) + r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))}) + if err != nil { + return nil, err + } + + return r.QueueUrl, nil +} + +// serve consumers +func (q *queue) serve(s *sqs.SQS, tout time.Duration) { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.active, 1) + + var errored bool + for { + messages, stop, err := q.consume(s) + if err != nil { + if errored { + // reoccurring error + time.Sleep(tout) + } else { + errored = true + q.report(err) + } + + continue + } + errored = false + + if stop { + return + } + + for _, msg := range messages { + h := <-q.execPool + go func(h jobs.Handler, msg *sqs.Message) { + err := q.do(s, h, msg) + q.execPool <- h + q.wg.Done() + q.report(err) + }(h, msg) + } + } +} + +// consume and allocate connection. +func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil, true, nil + default: + r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: q.url, + MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))), + WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())), + VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())), + AttributeNames: []*string{aws.String("ApproximateReceiveCount")}, + MessageAttributeNames: jobAttributes, + }) + if err != nil { + return nil, false, err + } + + q.wg.Add(len(r.Messages)) + + return r.Messages, false, nil + } +} + +// do single message +func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) { + id, attempt, j, err := unpack(msg) + if err != nil { + go q.deleteMessage(s, msg, err) + return err + } + + // block the job based on known timeout + _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + QueueUrl: q.url, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())), + }) + if err != nil { + go q.deleteMessage(s, msg, err) + return err + } + + err = h(id, j) + if err == nil { + return q.deleteMessage(s, msg, nil) + } + + q.errHandler(id, j, err) + + if !j.Options.CanRetry(attempt) { + return q.deleteMessage(s, msg, err) + } + + // retry after specified duration + _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + QueueUrl: q.url, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)), + }) + + return err +} + +func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error { + _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle}) + return drr +} + +// stop the queue consuming +func (q *queue) stop() { + if atomic.LoadInt32(&q.active) == 0 { + return + } + + atomic.StoreInt32(&q.active, 0) + + close(q.wait) + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() +} + +// add job to the queue +func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) { + r, err := s.SendMessage(pack(q.url, j)) + if err != nil { + return "", err + } + + return *r.MessageId, nil +} + +// return queue stats +func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) { + r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{ + QueueUrl: q.url, + AttributeNames: []*string{ + aws.String("ApproximateNumberOfMessages"), + aws.String("ApproximateNumberOfMessagesDelayed"), + aws.String("ApproximateNumberOfMessagesNotVisible"), + }, + }) + + if err != nil { + return nil, err + } + + stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")} + + for a, v := range r.Attributes { + if a == "ApproximateNumberOfMessages" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Queue = int64(v) + } + } + + if a == "ApproximateNumberOfMessagesNotVisible" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Active = int64(v) + } + } + + if a == "ApproximateNumberOfMessagesDelayed" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Delayed = int64(v) + } + } + } + + return stat, nil +} + +// throw handles service, server and pool events. +func (q *queue) report(err error) { + if err != nil { + q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/broker/sqs/stat_test.go b/plugins/jobs/broker/sqs/stat_test.go new file mode 100644 index 00000000..5031571b --- /dev/null +++ b/plugins/jobs/broker/sqs/stat_test.go @@ -0,0 +1,60 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + // unable to use approximated stats + _, err = b.Stat(pipe) + assert.NoError(t, err) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + _, err := b.Stat(pipe) + assert.NoError(t, err) + + close(waitJob) + return nil + } + + <-waitJob + _, err = b.Stat(pipe) + assert.NoError(t, err) +} diff --git a/plugins/jobs/broker_test.go b/plugins/jobs/broker_test.go new file mode 100644 index 00000000..9625e24b --- /dev/null +++ b/plugins/jobs/broker_test.go @@ -0,0 +1,314 @@ +package jobs + +import ( + "fmt" + "github.com/gofrs/uuid" + "sync" + "sync/atomic" + "time" +) + +// testBroker run testQueue using local goroutines. +type testBroker struct { + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*Pipeline]*testQueue +} + +// Listen attaches server event watcher. +func (b *testBroker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures broker. +func (b *testBroker) Init() (bool, error) { + b.queues = make(map[*Pipeline]*testQueue) + + return true, nil +} + +// Register broker pipeline. +func (b *testBroker) Register(pipe *Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("testQueue `%s` has already been registered", pipe.Name()) + } + + b.queues[pipe] = newQueue() + + return nil +} + +// Serve broker pipelines. +func (b *testBroker) Serve() error { + // start pipelines + b.mu.Lock() + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve() + } + } + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *testBroker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + // stop all pipelines + for _, q := range b.queues { + q.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before +// the service is started! +func (b *testBroker) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined testQueue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.wait != nil { + if q.execPool != nil { + go q.serve() + } + } + + return nil +} + +// Push job into the worker. +func (b *testBroker) Push(pipe *Pipeline, j *Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined testQueue `%s`", pipe.Name()) + } + + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + q.push(id.String(), j, 0, j.Options.DelayDuration()) + + return id.String(), nil +} + +// Stat must consume statistics about given pipeline or return error. +func (b *testBroker) Stat(pipe *Pipeline) (stat *Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined testQueue `%s`", pipe.Name()) + } + + return q.stat(), nil +} + +// check if broker is serving +func (b *testBroker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// testQueue returns testQueue associated with the pipeline. +func (b *testBroker) queue(pipe *Pipeline) *testQueue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *testBroker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} + +type testQueue struct { + active int32 + st *Stat + + // job pipeline + jobs chan *entry + + // pipelines operations + muw sync.Mutex + wg sync.WaitGroup + + // stop channel + wait chan interface{} + + // exec handlers + execPool chan Handler + errHandler ErrorHandler +} + +type entry struct { + id string + job *Job + attempt int +} + +// create new testQueue +func newQueue() *testQueue { + return &testQueue{st: &Stat{}, jobs: make(chan *entry)} +} + +// todo NOT USED +// associate testQueue with new do pool +//func (q *testQueue) configure(execPool chan Handler, err ErrorHandler) error { +// q.execPool = execPool +// q.errHandler = err +// +// return nil +//} + +// serve consumers +func (q *testQueue) serve() { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.active, 1) + + for { + e := q.consume() + if e == nil { + return + } + + atomic.AddInt64(&q.st.Active, 1) + h := <-q.execPool + go func(e *entry) { + q.do(h, e) + atomic.AddInt64(&q.st.Active, ^int64(0)) + q.execPool <- h + q.wg.Done() + }(e) + } +} + +// allocate one job entry +func (q *testQueue) consume() *entry { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil + case e := <-q.jobs: + q.wg.Add(1) + + return e + } +} + +// do singe job +func (q *testQueue) do(h Handler, e *entry) { + err := h(e.id, e.job) + + if err == nil { + atomic.AddInt64(&q.st.Queue, ^int64(0)) + return + } + + q.errHandler(e.id, e.job, err) + + if !e.job.Options.CanRetry(e.attempt) { + atomic.AddInt64(&q.st.Queue, ^int64(0)) + return + } + + q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) +} + +// stop the testQueue pipelines +func (q *testQueue) stop() { + if atomic.LoadInt32(&q.active) == 0 { + return + } + + atomic.StoreInt32(&q.active, 0) + + close(q.wait) + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() +} + +// add job to the testQueue +func (q *testQueue) push(id string, j *Job, attempt int, delay time.Duration) { + if delay == 0 { + atomic.AddInt64(&q.st.Queue, 1) + go func() { + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() + + return + } + + atomic.AddInt64(&q.st.Delayed, 1) + go func() { + time.Sleep(delay) + atomic.AddInt64(&q.st.Delayed, ^int64(0)) + atomic.AddInt64(&q.st.Queue, 1) + + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() +} + +func (q *testQueue) stat() *Stat { + return &Stat{ + InternalName: ":memory:", + Queue: atomic.LoadInt64(&q.st.Queue), + Active: atomic.LoadInt64(&q.st.Active), + Delayed: atomic.LoadInt64(&q.st.Delayed), + } +} diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go new file mode 100644 index 00000000..674bf468 --- /dev/null +++ b/plugins/jobs/config.go @@ -0,0 +1,91 @@ +package jobs + +import ( + "fmt" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" +) + +// Config defines settings for job broker, workers and job-pipeline mapping. +type Config struct { + // Workers configures roadrunner server and worker busy. + Workers *roadrunner.ServerConfig + + // Dispatch defines where and how to match jobs. + Dispatch map[string]*Options + + // Pipelines defines mapping between PHP job pipeline and associated job broker. + Pipelines map[string]*Pipeline + + // Consuming specifies names of pipelines to be consumed on service start. + Consume []string + + // parent config for broken options. + parent service.Config + pipelines Pipelines + route Dispatcher +} + +// Hydrate populates config values. +func (c *Config) Hydrate(cfg service.Config) (err error) { + c.Workers = &roadrunner.ServerConfig{} + c.Workers.InitDefaults() + + if err := cfg.Unmarshal(&c); err != nil { + return err + } + + c.pipelines, err = initPipelines(c.Pipelines) + if err != nil { + return err + } + + if c.Workers.Command != "" { + if err := c.Workers.Pool.Valid(); err != nil { + return c.Workers.Pool.Valid() + } + } + + c.parent = cfg + c.route = initDispatcher(c.Dispatch) + + return nil +} + +// MatchPipeline locates the pipeline associated with the job. +func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) { + opt := c.route.match(job) + + pipe := "" + if job.Options != nil { + pipe = job.Options.Pipeline + } + + if pipe == "" && opt != nil { + pipe = opt.Pipeline + } + + if pipe == "" { + return nil, nil, fmt.Errorf("unable to locate pipeline for `%s`", job.Job) + } + + if p := c.pipelines.Get(pipe); p != nil { + return p, opt, nil + } + + return nil, nil, fmt.Errorf("undefined pipeline `%s`", pipe) +} + +// Get underlying broker config. +func (c *Config) Get(service string) service.Config { + if c.parent == nil { + return nil + } + + return c.parent.Get(service) +} + +// Unmarshal is doing nothing. +func (c *Config) Unmarshal(out interface{}) error { + return nil +} diff --git a/plugins/jobs/config_test.go b/plugins/jobs/config_test.go new file mode 100644 index 00000000..c55a5c5f --- /dev/null +++ b/plugins/jobs/config_test.go @@ -0,0 +1,158 @@ +package jobs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { + if name == "same" || name == "jobs" { + return cfg + } + + return nil +} +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{cfg: `{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_OK(t *testing.T) { + cfg := &mockCfg{cfg: `{ + "workers":{"pool":{"numWorkers": 1}} +}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Unmarshal(t *testing.T) { + cfg := &mockCfg{cfg: `{ + "workers":{"pool":{"numWorkers": 1}} +}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + var i interface{} + assert.Nil(t, c.Unmarshal(i)) +} + +func Test_Config_Hydrate_Get(t *testing.T) { + cfg := &mockCfg{cfg: `{ + "workers":{"pool":{"numWorkers": 1}} +}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + assert.Nil(t, c.Get("nil")) +} + +func Test_Config_Hydrate_Get_Valid(t *testing.T) { + cfg := &mockCfg{cfg: `{ + "workers":{"pool":{"numWorkers": 1}} +}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + assert.Equal(t, cfg, c.Get("same")) +} + +func Test_Config_Hydrate_GetNoParent(t *testing.T) { + c := &Config{} + assert.Nil(t, c.Get("nil")) +} + +func Test_Pipelines(t *testing.T) { + cfg := &mockCfg{cfg: `{ + "workers":{ + "pool":{"numWorkers": 1} + }, + "pipelines":{ + "pipe": {"broker":"broker"} + }, + "dispatch":{ + "job.*": {"pipeline":"default"} + } + }`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + assert.Equal(t, "pipe", c.pipelines.Get("pipe").Name()) + assert.Equal(t, "broker", c.pipelines.Get("pipe").Broker()) +} + +func Test_Pipelines_NoBroker(t *testing.T) { + cfg := &mockCfg{cfg: `{ + "workers":{ + "pool":{"numWorkers": 1} + }, + "pipelines":{ + "pipe": {} + }, + "dispatch":{ + "job.*": {"pipeline":"default"} + } + }`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_MatchPipeline(t *testing.T) { + cfg := &mockCfg{cfg: `{ + "workers":{ + "pool":{"numWorkers": 1} + }, + "pipelines":{ + "pipe": {"broker":"default"} + }, + "dispatch":{ + "job.*": {"pipeline":"pipe","delay":10} + } + }`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + _, _, err := c.MatchPipeline(&Job{Job: "undefined", Options: &Options{}}) + assert.Error(t, err) + + p, _, _ := c.MatchPipeline(&Job{Job: "undefined", Options: &Options{Pipeline: "pipe"}}) + assert.Equal(t, "pipe", p.Name()) + + p, opt, _ := c.MatchPipeline(&Job{Job: "job.abc", Options: &Options{}}) + assert.Equal(t, "pipe", p.Name()) + assert.Equal(t, 10, opt.Delay) +} + +func Test_MatchPipeline_Error(t *testing.T) { + cfg := &mockCfg{cfg: `{ + "workers":{ + "pool":{"numWorkers": 1} + }, + "pipelines":{ + "pipe": {"broker":"default"} + }, + "dispatch":{ + "job.*": {"pipeline":"missing"} + } + }`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + _, _, err := c.MatchPipeline(&Job{Job: "job.abc", Options: &Options{}}) + assert.Error(t, err) +} diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher.go new file mode 100644 index 00000000..9fde8fac --- /dev/null +++ b/plugins/jobs/dispatcher.go @@ -0,0 +1,47 @@ +package jobs + +import ( + "strings" +) + +var separators = []string{"/", "-", "\\"} + +// Dispatcher provides ability to automatically locate the pipeline for the specific job +// and update job options (if none set). +type Dispatcher map[string]*Options + +// pre-compile patterns +func initDispatcher(routes map[string]*Options) Dispatcher { + dispatcher := make(Dispatcher) + for pattern, opts := range routes { + pattern = strings.ToLower(pattern) + pattern = strings.Trim(pattern, "-.*") + + for _, s := range separators { + pattern = strings.Replace(pattern, s, ".", -1) + } + + dispatcher[pattern] = opts + } + + return dispatcher +} + +// match clarifies target job pipeline and other job options. Can return nil. +func (dispatcher Dispatcher) match(job *Job) (found *Options) { + var best = 0 + + jobName := strings.ToLower(job.Job) + for pattern, opts := range dispatcher { + if strings.HasPrefix(jobName, pattern) && len(pattern) > best { + found = opts + best = len(pattern) + } + } + + if best == 0 { + return nil + } + + return found +} diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go new file mode 100644 index 00000000..59e3fd4e --- /dev/null +++ b/plugins/jobs/dispatcher_test.go @@ -0,0 +1,53 @@ +package jobs + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Map_All(t *testing.T) { + m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}}) + assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline) +} + +func Test_Map_Miss(t *testing.T) { + m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}}) + + assert.Nil(t, m.match(&Job{Job: "miss"})) +} + +func Test_Map_Best(t *testing.T) { + m := initDispatcher(map[string]*Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline) +} + +func Test_Map_BestUpper(t *testing.T) { + m := initDispatcher(map[string]*Options{ + "some.*": {Pipeline: "default"}, + "some.Other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) + assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline) +} + +func Test_Map_BestReversed(t *testing.T) { + m := initDispatcher(map[string]*Options{ + "some.*": {Pipeline: "default"}, + "some.other.*": {Pipeline: "other"}, + }) + + assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline) + assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline) + assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline) + assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline) +} diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio new file mode 100644 index 00000000..a8e3778f --- /dev/null +++ b/plugins/jobs/doc/jobs_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-06-15T11:53:27.842Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="FDJoAwnkRul0Vyh6IqdT" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">ddHBEoIgEADQr+FOoM10NquLJw+dGdmEGXQdpNH6+nTAjKwTy9tdYBbCs2Y8W9GpAiUYwqgcCT8SxnZJsp+WWR5eDmnqobZahqIVSv2EgDToXUvoo0KHaJzuYqywbaFykQlrcYjLbmjiWztRwwbKSpitXrV0yiujlK6JC+haue9MI5bqAL0SEocP4jnhmUV0PmrGDMw8vWUwvu/0J/t+mYXW/WiYgvXsaRN9Ec9f</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/jobs/event.go b/plugins/jobs/event.go new file mode 100644 index 00000000..68dd34e5 --- /dev/null +++ b/plugins/jobs/event.go @@ -0,0 +1,96 @@ +package jobs + +import "time" + +const ( + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK = iota + 1500 + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeConsume when pipeline pipelines has been requested. + EventPipeConsume + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStop when pipeline has begun stopping. + EventPipeStop + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventBrokerReady thrown when broken is ready to accept/serve tasks. + EventBrokerReady +) + +// JobEvent represent job event. +type JobEvent struct { + // String is job id. + ID string + + // Job is failed job. + Job *Job + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *JobEvent) Elapsed() time.Duration { + return e.elapsed +} + +// JobError represents singular Job error event. +type JobError struct { + // String is job id. + ID string + + // Job is failed job. + Job *Job + + // Caused contains job specific error. + Caused error + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *JobError) Elapsed() time.Duration { + return e.elapsed +} + +// Caused returns error message. +func (e *JobError) Error() string { + return e.Caused.Error() +} + +// PipelineError defines pipeline specific errors. +type PipelineError struct { + // Pipeline is associated pipeline. + Pipeline *Pipeline + + // Caused send by broker. + Caused error +} + +// Error returns error message. +func (e *PipelineError) Error() string { + return e.Caused.Error() +} diff --git a/plugins/jobs/event_test.go b/plugins/jobs/event_test.go new file mode 100644 index 00000000..94d53531 --- /dev/null +++ b/plugins/jobs/event_test.go @@ -0,0 +1,52 @@ +package jobs + +import ( + "errors" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestJobEvent_Elapsed(t *testing.T) { + e := &JobEvent{ + ID: "id", + Job: &Job{}, + start: time.Now(), + elapsed: time.Millisecond, + } + + assert.Equal(t, time.Millisecond, e.Elapsed()) +} + +func TestJobError_Elapsed(t *testing.T) { + e := &JobError{ + ID: "id", + Job: &Job{}, + start: time.Now(), + elapsed: time.Millisecond, + } + + assert.Equal(t, time.Millisecond, e.Elapsed()) +} + +func TestJobError_Error(t *testing.T) { + e := &JobError{ + ID: "id", + Job: &Job{}, + start: time.Now(), + elapsed: time.Millisecond, + Caused: errors.New("error"), + } + + assert.Equal(t, time.Millisecond, e.Elapsed()) + assert.Equal(t, "error", e.Error()) +} + +func TestPipelineError_Error(t *testing.T) { + e := &PipelineError{ + Pipeline: &Pipeline{}, + Caused: errors.New("error"), + } + + assert.Equal(t, "error", e.Error()) +} diff --git a/plugins/jobs/job.go b/plugins/jobs/job.go new file mode 100644 index 00000000..b747fcfd --- /dev/null +++ b/plugins/jobs/job.go @@ -0,0 +1,38 @@ +package jobs + +import json "github.com/json-iterator/go" + +// Handler handles job execution. +type Handler func(id string, j *Job) error + +// ErrorHandler handles job execution errors. +type ErrorHandler func(id string, j *Job, err error) + +// Job carries information about single job. +type Job struct { + // Job contains name of job broker (usually PHP class). + Job string `json:"job"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Body packs job payload into binary payload. +func (j *Job) Body() []byte { + return []byte(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +func (j *Job) Context(id string) []byte { + ctx, _ := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + }{ID: id, Job: j.Job}, + ) + + return ctx +} diff --git a/plugins/jobs/job_options.go b/plugins/jobs/job_options.go new file mode 100644 index 00000000..d4c6f0d2 --- /dev/null +++ b/plugins/jobs/job_options.go @@ -0,0 +1,70 @@ +package jobs + +import "time" + +// Options carry information about how to handle given job. +type Options struct { + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int `json:"delay,omitempty"` + + // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry). + // Minimum valuable value is 2. + Attempts int `json:"maxAttempts,omitempty"` + + // RetryDelay defines for how long job should be waiting until next retry. Defaults to none. + RetryDelay int `json:"retryDelay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int `json:"timeout,omitempty"` +} + +// Merge merges job options. +func (o *Options) Merge(from *Options) { + if o.Pipeline == "" { + o.Pipeline = from.Pipeline + } + + if o.Attempts == 0 { + o.Attempts = from.Attempts + } + + if o.Timeout == 0 { + o.Timeout = from.Timeout + } + + if o.RetryDelay == 0 { + o.RetryDelay = from.RetryDelay + } + + if o.Delay == 0 { + o.Delay = from.Delay + } +} + +// CanRetry must return true if broker is allowed to re-run the job. +func (o *Options) CanRetry(attempt int) bool { + // Attempts 1 and 0 has identical effect + return o.Attempts > (attempt + 1) +} + +// RetryDuration returns retry delay duration in a form of time.Duration. +func (o *Options) RetryDuration() time.Duration { + return time.Second * time.Duration(o.RetryDelay) +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/job_options_test.go new file mode 100644 index 00000000..8caaa935 --- /dev/null +++ b/plugins/jobs/job_options_test.go @@ -0,0 +1,109 @@ +package jobs + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestOptions_CanRetry(t *testing.T) { + opts := &Options{Attempts: 0} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_SameValue(t *testing.T) { + opts := &Options{Attempts: 1} + + assert.False(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) +} + +func TestOptions_CanRetry_Value(t *testing.T) { + opts := &Options{Attempts: 2} + + assert.True(t, opts.CanRetry(0)) + assert.False(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_CanRetry_Value3(t *testing.T) { + opts := &Options{Attempts: 3} + + assert.True(t, opts.CanRetry(0)) + assert.True(t, opts.CanRetry(1)) + assert.False(t, opts.CanRetry(2)) +} + +func TestOptions_RetryDuration(t *testing.T) { + opts := &Options{RetryDelay: 0} + assert.Equal(t, time.Duration(0), opts.RetryDuration()) +} + +func TestOptions_RetryDuration2(t *testing.T) { + opts := &Options{RetryDelay: 1} + assert.Equal(t, time.Second, opts.RetryDuration()) +} + +func TestOptions_DelayDuration(t *testing.T) { + opts := &Options{Delay: 0} + assert.Equal(t, time.Duration(0), opts.DelayDuration()) +} + +func TestOptions_DelayDuration2(t *testing.T) { + opts := &Options{Delay: 1} + assert.Equal(t, time.Second, opts.DelayDuration()) +} + +func TestOptions_TimeoutDuration(t *testing.T) { + opts := &Options{Timeout: 0} + assert.Equal(t, time.Minute*30, opts.TimeoutDuration()) +} + +func TestOptions_TimeoutDuration2(t *testing.T) { + opts := &Options{Timeout: 1} + assert.Equal(t, time.Second, opts.TimeoutDuration()) +} + +func TestOptions_Merge(t *testing.T) { + opts := &Options{} + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "pipeline", opts.Pipeline) + assert.Equal(t, 1, opts.Attempts) + assert.Equal(t, 2, opts.Delay) + assert.Equal(t, 1, opts.Timeout) + assert.Equal(t, 1, opts.RetryDelay) +} + +func TestOptions_MergeKeepOriginal(t *testing.T) { + opts := &Options{ + Pipeline: "default", + Delay: 10, + Timeout: 10, + Attempts: 10, + RetryDelay: 10, + } + + opts.Merge(&Options{ + Pipeline: "pipeline", + Delay: 2, + Timeout: 1, + Attempts: 1, + RetryDelay: 1, + }) + + assert.Equal(t, "default", opts.Pipeline) + assert.Equal(t, 10, opts.Attempts) + assert.Equal(t, 10, opts.Delay) + assert.Equal(t, 10, opts.Timeout) + assert.Equal(t, 10, opts.RetryDelay) +} diff --git a/plugins/jobs/job_test.go b/plugins/jobs/job_test.go new file mode 100644 index 00000000..e1938eca --- /dev/null +++ b/plugins/jobs/job_test.go @@ -0,0 +1,18 @@ +package jobs + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestJob_Body(t *testing.T) { + j := &Job{Payload: "hello"} + + assert.Equal(t, []byte("hello"), j.Body()) +} + +func TestJob_Context(t *testing.T) { + j := &Job{Job: "job"} + + assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id")) +} diff --git a/plugins/jobs/pipeline.go b/plugins/jobs/pipeline.go new file mode 100644 index 00000000..58b76e33 --- /dev/null +++ b/plugins/jobs/pipeline.go @@ -0,0 +1,169 @@ +package jobs + +import ( + "fmt" + "time" +) + +// Pipelines is list of Pipeline. +type Pipelines []*Pipeline + +func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) { + out := make(Pipelines, 0) + + for name, pipe := range pipes { + if pipe.Broker() == "" { + return nil, fmt.Errorf("found the pipeline without defined broker") + } + + p := pipe.With("name", name) + out = append(out, &p) + } + + return out, nil +} + +// Reverse returns pipelines in reversed order. +func (ps Pipelines) Reverse() Pipelines { + out := make(Pipelines, len(ps)) + + for i, p := range ps { + out[len(ps)-i-1] = p + } + + return out +} + +// Broker return pipelines associated with specific broker. +func (ps Pipelines) Broker(broker string) Pipelines { + out := make(Pipelines, 0) + + for _, p := range ps { + if p.Broker() != broker { + continue + } + + out = append(out, p) + } + + return out +} + +// Names returns only pipelines with specified names. +func (ps Pipelines) Names(only ...string) Pipelines { + out := make(Pipelines, 0) + + for _, name := range only { + for _, p := range ps { + if p.Name() == name { + out = append(out, p) + } + } + } + + return out +} + +// Get returns pipeline by it'svc name. +func (ps Pipelines) Get(name string) *Pipeline { + // possibly optimize + for _, p := range ps { + if p.Name() == name { + return p + } + } + + return nil +} + +// Pipeline defines pipeline options. +type Pipeline map[string]interface{} + +// With pipeline value. Immutable. +func (p Pipeline) With(name string, value interface{}) Pipeline { + out := make(map[string]interface{}) + for k, v := range p { + out[k] = v + } + out[name] = value + + return Pipeline(out) +} + +// Name returns pipeline name. +func (p Pipeline) Name() string { + return p.String("name", "") +} + +// Broker associated with the pipeline. +func (p Pipeline) Broker() string { + return p.String("broker", "") +} + +// Has checks if value presented in pipeline. +func (p Pipeline) Has(name string) bool { + if _, ok := p[name]; ok { + return true + } + + return false +} + +// Map must return nested map value or empty config. +func (p Pipeline) Map(name string) Pipeline { + out := make(map[string]interface{}) + + if value, ok := p[name]; ok { + if m, ok := value.(map[string]interface{}); ok { + for k, v := range m { + out[k] = v + } + } + } + + return Pipeline(out) +} + +// Bool must return option value as string or return default value. +func (p Pipeline) Bool(name string, d bool) bool { + if value, ok := p[name]; ok { + if b, ok := value.(bool); ok { + return b + } + } + + return d +} + +// String must return option value as string or return default value. +func (p Pipeline) String(name string, d string) string { + if value, ok := p[name]; ok { + if str, ok := value.(string); ok { + return str + } + } + + return d +} + +// Integer must return option value as string or return default value. +func (p Pipeline) Integer(name string, d int) int { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return str + } + } + + return d +} + +// Duration must return option value as time.Duration (seconds) or return default value. +func (p Pipeline) Duration(name string, d time.Duration) time.Duration { + if value, ok := p[name]; ok { + if str, ok := value.(int); ok { + return time.Second * time.Duration(str) + } + } + + return d +} diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline_test.go new file mode 100644 index 00000000..b80e75d0 --- /dev/null +++ b/plugins/jobs/pipeline_test.go @@ -0,0 +1,89 @@ +package jobs + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestPipeline_Map(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0)) + assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0)) +} + +func TestPipeline_MapString(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}} + + assert.Equal(t, "default", pipe.Map("options").String("alias", "")) + assert.Equal(t, "", pipe.Map("other").String("alias", "")) +} + +func TestPipeline_Bool(t *testing.T) { + pipe := Pipeline{"value": true} + + assert.Equal(t, true, pipe.Bool("value", false)) + assert.Equal(t, true, pipe.Bool("other", true)) +} + +func TestPipeline_String(t *testing.T) { + pipe := Pipeline{"value": "value"} + + assert.Equal(t, "value", pipe.String("value", "")) + assert.Equal(t, "value", pipe.String("other", "value")) +} + +func TestPipeline_Integer(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, 1, pipe.Integer("value", 0)) + assert.Equal(t, 1, pipe.Integer("other", 1)) +} + +func TestPipeline_Duration(t *testing.T) { + pipe := Pipeline{"value": 1} + + assert.Equal(t, time.Second, pipe.Duration("value", 0)) + assert.Equal(t, time.Second, pipe.Duration("other", time.Second)) +} + +func TestPipeline_Has(t *testing.T) { + pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}} + + assert.Equal(t, true, pipe.Has("options")) + assert.Equal(t, false, pipe.Has("other")) +} + +func TestPipeline_FilterBroker(t *testing.T) { + pipes := Pipelines{ + &Pipeline{"name": "first", "broker": "a"}, + &Pipeline{"name": "second", "broker": "a"}, + &Pipeline{"name": "third", "broker": "b"}, + &Pipeline{"name": "forth", "broker": "b"}, + } + + filtered := pipes.Names("first", "third") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[0].Broker()) + assert.Equal(t, "b", filtered[1].Broker()) + + filtered = pipes.Names("first", "third").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "a", filtered[1].Broker()) + assert.Equal(t, "b", filtered[0].Broker()) + + filtered = pipes.Broker("a") + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[0].Name()) + assert.Equal(t, "second", filtered[1].Name()) + + filtered = pipes.Broker("a").Reverse() + assert.True(t, len(filtered) == 2) + + assert.Equal(t, "first", filtered[1].Name()) + assert.Equal(t, "second", filtered[0].Name()) +} diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go new file mode 100644 index 00000000..cc1ecd99 --- /dev/null +++ b/plugins/jobs/rpc.go @@ -0,0 +1,151 @@ +package jobs + +import ( + "fmt" + "github.com/spiral/roadrunner/util" +) + +type rpcServer struct{ svc *Service } + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []*util.State `json:"workers"` +} + +// PipelineList contains list of pipeline stats. +type PipelineList struct { + // Pipelines is list of pipeline stats. + Pipelines []*Stat `json:"pipelines"` +} + +// Push job to the testQueue. +func (rpc *rpcServer) Push(j *Job, id *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *id, err = rpc.svc.Push(j) + return +} + +// Push job to the testQueue. +func (rpc *rpcServer) PushAsync(j *Job, ok *bool) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *ok = true + go rpc.svc.Push(j) + + return +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *rpcServer) Reset(reset bool, w *string) error { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *w = "OK" + return rpc.svc.rr.Reset() +} + +// Destroy job pipelines for a given pipeline. +func (rpc *rpcServer) Stop(pipeline string, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + pipe := rpc.svc.cfg.pipelines.Get(pipeline) + if pipe == nil { + return fmt.Errorf("undefined pipeline `%s`", pipeline) + } + + if err := rpc.svc.Consume(pipe, nil, nil); err != nil { + return err + } + + *w = "OK" + return nil +} + +// Resume job pipelines for a given pipeline. +func (rpc *rpcServer) Resume(pipeline string, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + pipe := rpc.svc.cfg.pipelines.Get(pipeline) + if pipe == nil { + return fmt.Errorf("undefined pipeline `%s`", pipeline) + } + + if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { + return err + } + + *w = "OK" + return nil +} + +// Destroy job pipelines for a given pipeline. +func (rpc *rpcServer) StopAll(stop bool, w *string) (err error) { + if rpc.svc == nil || rpc.svc.rr == nil { + return fmt.Errorf("jobs server is not running") + } + + for _, pipe := range rpc.svc.cfg.pipelines { + if err := rpc.svc.Consume(pipe, nil, nil); err != nil { + return err + } + } + + *w = "OK" + return nil +} + +// Resume job pipelines for a given pipeline. +func (rpc *rpcServer) ResumeAll(resume bool, w *string) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + for _, pipe := range rpc.svc.cfg.pipelines { + if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil { + return err + } + } + + *w = "OK" + return nil +} + +// Workers returns list of pipelines workers and their stats. +func (rpc *rpcServer) Workers(list bool, w *WorkerList) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + w.Workers, err = util.ServerState(rpc.svc.rr) + return err +} + +// Stat returns list of pipelines workers and their stats. +func (rpc *rpcServer) Stat(list bool, l *PipelineList) (err error) { + if rpc.svc == nil { + return fmt.Errorf("jobs server is not running") + } + + *l = PipelineList{} + for _, p := range rpc.svc.cfg.pipelines { + stat, err := rpc.svc.Stat(p) + if err != nil { + return err + } + + l.Pipelines = append(l.Pipelines, stat) + } + + return err +} diff --git a/plugins/jobs/rpc_test.go b/plugins/jobs/rpc_test.go new file mode 100644 index 00000000..c70ef86f --- /dev/null +++ b/plugins/jobs/rpc_test.go @@ -0,0 +1,657 @@ +package jobs + +import ( + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/rpc" + "github.com/stretchr/testify/assert" + "io/ioutil" + "syscall" + "testing" +) + +func TestRPC_StatPipeline(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, true, list.Pipelines[0].Consuming) +} + +func TestRPC_StatNonActivePipeline(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, false, list.Pipelines[0].Consuming) +} + +func TestRPC_StatPipelineWithUndefinedBroker(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"undefined"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + list := &PipelineList{} + assert.Error(t, cl.Call("jobs.Stat", true, &list)) +} + +func TestRPC_EnableConsuming(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + pipelineReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventPipeActive { + close(pipelineReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + assert.NoError(t, cl.Call("jobs.Resume", "default", nil)) + + <-pipelineReady + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, true, list.Pipelines[0].Consuming) +} + +func TestRPC_EnableConsumingUndefined(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5005"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + ok := "" + assert.Error(t, cl.Call("jobs.Resume", "undefined", &ok)) +} + +func TestRPC_EnableConsumingUndefinedBroker(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5005"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"undefined"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + ok := "" + assert.Error(t, cl.Call("jobs.Resume", "default", &ok)) +} + +func TestRPC_EnableConsumingAllUndefinedBroker(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5005"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"undefined"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + ok := "" + assert.Error(t, cl.Call("jobs.ResumeAll", true, &ok)) +} + +func TestRPC_DisableConsuming(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + pipelineReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventPipeStopped { + close(pipelineReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + assert.NoError(t, cl.Call("jobs.Stop", "default", nil)) + + <-pipelineReady + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, false, list.Pipelines[0].Consuming) +} + +func TestRPC_DisableConsumingUndefined(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + ok := "" + assert.Error(t, cl.Call("jobs.Stop", "undefined", &ok)) +} + +func TestRPC_EnableAllConsuming(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + pipelineReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventPipeActive { + close(pipelineReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + assert.NoError(t, cl.Call("jobs.ResumeAll", true, nil)) + + <-pipelineReady + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, true, list.Pipelines[0].Consuming) +} + +func TestRPC_DisableAllConsuming(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + pipelineReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventPipeStopped { + close(pipelineReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + assert.NoError(t, cl.Call("jobs.StopAll", true, nil)) + + <-pipelineReady + + list := &PipelineList{} + assert.NoError(t, cl.Call("jobs.Stat", true, &list)) + + assert.Len(t, list.Pipelines, 1) + + assert.Equal(t, int64(0), list.Pipelines[0].Queue) + assert.Equal(t, false, list.Pipelines[0].Consuming) +} + +func TestRPC_DoJob(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventJobOK { + close(jobReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + id := "" + assert.NoError(t, cl.Call("jobs.Push", &Job{ + Job: "spiral.jobs.tests.local.job", + Payload: `{"data":100}`, + Options: &Options{}, + }, &id)) + assert.NoError(t, err) + + <-jobReady + + data, err := ioutil.ReadFile("tests/local.job") + assert.NoError(t, err) + defer syscall.Unlink("tests/local.job") + + assert.Contains(t, string(data), id) +} + +func TestRPC_NoOperationOnDeadServer(t *testing.T) { + rc := &rpcServer{nil} + + assert.Error(t, rc.Push(&Job{}, nil)) + assert.Error(t, rc.Reset(true, nil)) + + assert.Error(t, rc.Stop("default", nil)) + assert.Error(t, rc.StopAll(true, nil)) + + assert.Error(t, rc.Resume("default", nil)) + assert.Error(t, rc.ResumeAll(true, nil)) + + assert.Error(t, rc.Workers(true, nil)) + assert.Error(t, rc.Stat(true, nil)) +} + +func TestRPC_Workers(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("rpc", &rpc.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "rpc":{"listen":"tcp://:5004"}, + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + cl, err := rs.Client() + assert.NoError(t, err) + + list := &WorkerList{} + assert.NoError(t, cl.Call("jobs.Workers", true, &list)) + + assert.Len(t, list.Workers, 1) + + pid := list.Workers[0].Pid + assert.NotEqual(t, 0, pid) + + // reset + ok := "" + assert.NoError(t, cl.Call("jobs.Reset", true, &ok)) + + list = &WorkerList{} + assert.NoError(t, cl.Call("jobs.Workers", true, &list)) + + assert.Len(t, list.Workers, 1) + + assert.NotEqual(t, list.Workers[0].Pid, pid) +} diff --git a/plugins/jobs/service.go b/plugins/jobs/service.go new file mode 100644 index 00000000..bb7ce09c --- /dev/null +++ b/plugins/jobs/service.go @@ -0,0 +1,327 @@ +package jobs + +import ( + "fmt" + //"github.com/sirupsen/logrus" + //"github.com/spiral/roadrunner" + //"github.com/spiral/roadrunner/service" + //"github.com/spiral/roadrunner/service/env" + //"github.com/spiral/roadrunner/service/rpc" + "sync" + "sync/atomic" + "time" +) + +// ID defines public service name. +const ID = "jobs" + +// Service wraps roadrunner container and manage set of parent within it. +type Service struct { + // Associated parent + Brokers map[string]Broker + + // brokers and routing config + cfg *Config + + // environment, logger and listeners + //env env.Environment + //log *logrus.Logger + lsn []func(event int, ctx interface{}) + + // server and server controller + //rr *roadrunner.Server + //cr roadrunner.Controller + + // task balancer + execPool chan Handler + + // registered brokers + serving int32 + //brokers service.Container + + // pipelines pipelines + mup sync.Mutex + pipelines map[*Pipeline]bool +} + +// Attach attaches cr. Currently only one cr is supported. +func (svc *Service) Attach(ctr roadrunner.Controller) { + svc.cr = ctr +} + +// AddListener attaches event listeners to the service and all underlying brokers. +func (svc *Service) AddListener(l func(event int, ctx interface{})) { + svc.lsn = append(svc.lsn, l) +} + +// Init configures job service. +func (svc *Service) Init( + cfg service.Config, + log *logrus.Logger, + env env.Environment, + rpc *rpc.Service, +) (ok bool, err error) { + svc.cfg = &Config{} + if err := svc.cfg.Hydrate(cfg); err != nil { + return false, err + } + + svc.env = env + svc.log = log + + if rpc != nil { + if err := rpc.Register(ID, &rpcServer{svc}); err != nil { + return false, err + } + } + + // limit the number of parallel threads + if svc.cfg.Workers.Command != "" { + svc.execPool = make(chan Handler, svc.cfg.Workers.Pool.NumWorkers) + for i := int64(0); i < svc.cfg.Workers.Pool.NumWorkers; i++ { + svc.execPool <- svc.exec + } + + svc.rr = roadrunner.NewServer(svc.cfg.Workers) + } + + svc.pipelines = make(map[*Pipeline]bool) + for _, p := range svc.cfg.pipelines { + svc.pipelines[p] = false + } + + // run all brokers in nested container + svc.brokers = service.NewContainer(log) + for name, b := range svc.Brokers { + svc.brokers.Register(name, b) + if ep, ok := b.(EventProvider); ok { + ep.Listen(svc.throw) + } + } + + // init all broker configs + if err := svc.brokers.Init(svc.cfg); err != nil { + return false, err + } + + // register all pipelines (per broker) + for name, b := range svc.Brokers { + for _, pipe := range svc.cfg.pipelines.Broker(name) { + if err := b.Register(pipe); err != nil { + return false, err + } + } + } + + return true, nil +} + +// Serve serves local rr server and creates broker association. +func (svc *Service) Serve() error { + if svc.rr != nil { + if svc.env != nil { + if err := svc.env.Copy(svc.cfg.Workers); err != nil { + return err + } + } + + // ensure that workers aware of running within jobs + svc.cfg.Workers.SetEnv("rr_jobs", "true") + svc.rr.Listen(svc.throw) + + if svc.cr != nil { + svc.rr.Attach(svc.cr) + } + + if err := svc.rr.Start(); err != nil { + return err + } + defer svc.rr.Stop() + + // start pipelines of all the pipelines + for _, p := range svc.cfg.pipelines.Names(svc.cfg.Consume...) { + // start pipeline consuming + if err := svc.Consume(p, svc.execPool, svc.error); err != nil { + svc.Stop() + + return err + } + } + } + + atomic.StoreInt32(&svc.serving, 1) + defer atomic.StoreInt32(&svc.serving, 0) + + return svc.brokers.Serve() +} + +// Stop all pipelines and rr server. +func (svc *Service) Stop() { + if atomic.LoadInt32(&svc.serving) == 0 { + return + } + + wg := sync.WaitGroup{} + for _, p := range svc.cfg.pipelines.Names(svc.cfg.Consume...).Reverse() { + wg.Add(1) + + go func(p *Pipeline) { + defer wg.Done() + if err := svc.Consume(p, nil, nil); err != nil { + svc.throw(EventPipeError, &PipelineError{Pipeline: p, Caused: err}) + } + }(p) + } + + wg.Wait() + svc.brokers.Stop() +} + +// Server returns associated rr server (if any). +func (svc *Service) Server() *roadrunner.Server { + return svc.rr +} + +// Stat returns list of pipelines workers and their stats. +func (svc *Service) Stat(pipe *Pipeline) (stat *Stat, err error) { + b, ok := svc.Brokers[pipe.Broker()] + if !ok { + return nil, fmt.Errorf("undefined broker `%s`", pipe.Broker()) + } + + stat, err = b.Stat(pipe) + if err != nil { + return nil, err + } + + stat.Pipeline = pipe.Name() + stat.Broker = pipe.Broker() + + svc.mup.Lock() + stat.Consuming = svc.pipelines[pipe] + svc.mup.Unlock() + + return stat, err +} + +// Consume enables or disables pipeline pipelines using given handlers. +func (svc *Service) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error { + svc.mup.Lock() + + if execPool != nil { + if svc.pipelines[pipe] { + svc.mup.Unlock() + return nil + } + + svc.throw(EventPipeConsume, pipe) + svc.pipelines[pipe] = true + } else { + if !svc.pipelines[pipe] { + svc.mup.Unlock() + return nil + } + + svc.throw(EventPipeStop, pipe) + svc.pipelines[pipe] = false + } + + broker, ok := svc.Brokers[pipe.Broker()] + if !ok { + svc.mup.Unlock() + return fmt.Errorf("undefined broker `%s`", pipe.Broker()) + } + svc.mup.Unlock() + + if err := broker.Consume(pipe, execPool, errHandler); err != nil { + svc.mup.Lock() + svc.pipelines[pipe] = false + svc.mup.Unlock() + + svc.throw(EventPipeError, &PipelineError{Pipeline: pipe, Caused: err}) + + return err + } + + if execPool != nil { + svc.throw(EventPipeActive, pipe) + } else { + svc.throw(EventPipeStopped, pipe) + } + + return nil +} + +// Push job to associated broker and return job id. +func (svc *Service) Push(job *Job) (string, error) { + pipe, pOpts, err := svc.cfg.MatchPipeline(job) + if err != nil { + return "", err + } + + if pOpts != nil { + job.Options.Merge(pOpts) + } + + broker, ok := svc.Brokers[pipe.Broker()] + if !ok { + return "", fmt.Errorf("undefined broker `%s`", pipe.Broker()) + } + + id, err := broker.Push(pipe, job) + + if err != nil { + svc.throw(EventPushError, &JobError{Job: job, Caused: err}) + } else { + svc.throw(EventPushOK, &JobEvent{ID: id, Job: job}) + } + + return id, err +} + +// exec executed job using local RR server. Make sure that service is started. +func (svc *Service) exec(id string, j *Job) error { + start := time.Now() + svc.throw(EventJobStart, &JobEvent{ID: id, Job: j, start: start}) + + // ignore response for now, possibly add more routing options + _, err := svc.rr.Exec(&roadrunner.Payload{ + Body: j.Body(), + Context: j.Context(id), + }) + + if err == nil { + svc.throw(EventJobOK, &JobEvent{ + ID: id, + Job: j, + start: start, + elapsed: time.Since(start), + }) + } else { + svc.throw(EventJobError, &JobError{ + ID: id, + Job: j, + Caused: err, start: start, + elapsed: time.Since(start), + }) + } + + return err +} + +// register died job, can be used to move to fallback testQueue or log +func (svc *Service) error(id string, j *Job, err error) { + // nothing for now, possibly route to another pipeline +} + +// throw handles service, server and pool events. +func (svc *Service) throw(event int, ctx interface{}) { + for _, l := range svc.lsn { + l(event, ctx) + } + + if event == roadrunner.EventServerFailure { + // underlying rr server is dead, stop everything + svc.Stop() + } +} diff --git a/plugins/jobs/service_test.go b/plugins/jobs/service_test.go new file mode 100644 index 00000000..74781525 --- /dev/null +++ b/plugins/jobs/service_test.go @@ -0,0 +1,458 @@ +package jobs + +import ( + "bytes" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/env" + "github.com/stretchr/testify/assert" + "io/ioutil" + "syscall" + "testing" +) + +func viperConfig(cfg string) service.Config { + v := viper.New() + v.SetConfigType("json") + + err := v.ReadConfig(bytes.NewBuffer([]byte(cfg))) + if err != nil { + panic(err) + } + + return &configWrapper{v} +} + +// configWrapper provides interface bridge between v configs and service.Config. +type configWrapper struct { + v *viper.Viper +} + +// Get nested config section (sub-map), returns nil if section not found. +func (w *configWrapper) Get(key string) service.Config { + sub := w.v.Sub(key) + if sub == nil { + return nil + } + + return &configWrapper{sub} +} + +// Unmarshal unmarshal config data into given struct. +func (w *configWrapper) Unmarshal(out interface{}) error { + return w.v.Unmarshal(out) +} + +func jobs(container service.Container) *Service { + svc, _ := container.Get("jobs") + return svc.(*Service) +} + +func TestService_Init(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) +} + +func TestService_ServeStop(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("env", &env.Service{}) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + <-ready + c.Stop() +} + +func TestService_ServeError(t *testing.T) { + l := logrus.New() + l.Level = logrus.FatalLevel + + c := service.NewContainer(l) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/bad-consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + assert.Error(t, c.Serve()) +} + +func TestService_GetPipeline(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + assert.Equal(t, "ephemeral", jobs(c).cfg.pipelines.Get("default").Broker()) +} + +func TestService_StatPipeline(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + svc := jobs(c) + pipe := svc.cfg.pipelines.Get("default") + + stat, err := svc.Stat(pipe) + assert.NoError(t, err) + + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, true, stat.Consuming) +} + +func TestService_StatNonConsumingPipeline(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + svc := jobs(c) + pipe := svc.cfg.pipelines.Get("default") + + stat, err := svc.Stat(pipe) + assert.NoError(t, err) + + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, false, stat.Consuming) +} + +func TestService_DoJob(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobReady := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventJobOK { + close(jobReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + svc := jobs(c) + + id, err := svc.Push(&Job{ + Job: "spiral.jobs.tests.local.job", + Payload: `{"data":100}`, + Options: &Options{}, + }) + assert.NoError(t, err) + + <-jobReady + + data, err := ioutil.ReadFile("tests/local.job") + assert.NoError(t, err) + defer syscall.Unlink("tests/local.job") + + assert.Contains(t, string(data), id) +} + +func TestService_DoUndefinedJob(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + svc := jobs(c) + + _, err := svc.Push(&Job{ + Job: "spiral.jobs.tests.undefined", + Payload: `{"data":100}`, + Options: &Options{}, + }) + assert.Error(t, err) +} + +func TestService_DoJobIntoInvalidBroker(t *testing.T) { + l := logrus.New() + l.Level = logrus.FatalLevel + + c := service.NewContainer(l) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"undefined"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + svc := jobs(c) + + _, err := svc.Push(&Job{ + Job: "spiral.jobs.tests.local.job", + Payload: `{"data":100}`, + Options: &Options{}, + }) + assert.Error(t, err) +} + +func TestService_DoStatInvalidBroker(t *testing.T) { + l := logrus.New() + l.Level = logrus.FatalLevel + + c := service.NewContainer(l) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"undefined"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": [] + } +}`))) + + ready := make(chan interface{}) + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + svc := jobs(c) + + _, err := svc.Stat(svc.cfg.pipelines.Get("default")) + assert.Error(t, err) +} + +func TestService_DoErrorJob(t *testing.T) { + c := service.NewContainer(logrus.New()) + c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}}) + + assert.NoError(t, c.Init(viperConfig(`{ + "jobs":{ + "workers":{ + "command": "php tests/consumer.php", + "pool.numWorkers": 1 + }, + "pipelines":{"default":{"broker":"ephemeral"}}, + "dispatch": { + "spiral-jobs-tests-local-*.pipeline": "default" + }, + "consume": ["default"] + } +}`))) + + ready := make(chan interface{}) + jobReady := make(chan interface{}) + + var jobErr error + jobs(c).AddListener(func(event int, ctx interface{}) { + if event == EventBrokerReady { + close(ready) + } + + if event == EventJobError { + jobErr = ctx.(error) + close(jobReady) + } + }) + + go func() { c.Serve() }() + defer c.Stop() + <-ready + + svc := jobs(c) + + _, err := svc.Push(&Job{ + Job: "spiral.jobs.tests.local.errorJob", + Payload: `{"data":100}`, + Options: &Options{}, + }) + assert.NoError(t, err) + + <-jobReady + assert.Error(t, jobErr) + assert.Contains(t, jobErr.Error(), "something is wrong") +} diff --git a/plugins/jobs/tests/.rr.yaml b/plugins/jobs/tests/.rr.yaml new file mode 100644 index 00000000..2fd323db --- /dev/null +++ b/plugins/jobs/tests/.rr.yaml @@ -0,0 +1,63 @@ +jobs: + # worker pool configuration + workers: + command: "php consumer.php" + pool: + numWorkers: 4 + + # rabbitmq and similar servers + amqp: + addr: amqp://guest:guest@localhost:5672/ + + # beanstalk configuration + beanstalk: + addr: tcp://localhost:11300 + + # amazon sqs configuration + sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:9324 + + # job destinations and options + dispatch: + spiral-jobs-tests-amqp-*.pipeline: amqp + spiral-jobs-tests-local-*.pipeline: local + spiral-jobs-tests-beanstalk-*.pipeline: beanstalk + spiral-jobs-tests-sqs-*.pipeline: sqs + + # list of broker pipelines associated with endpoints + pipelines: + local: + broker: ephemeral + + amqp: + broker: amqp + queue: default + + beanstalk: + broker: beanstalk + tube: default + + sqs: + broker: sqs + queue: default + declare: + MessageRetentionPeriod: 86400 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: ["local", "amqp", "beanstalk", "sqs"] + +metrics: + address: localhost:2112 + +# monitors rr server(s) +limit: + interval: 1 + services: + jobs: + maxMemory: 100 + TTL: 0 + idleTTL: 0 + execTTL: 60
\ No newline at end of file diff --git a/plugins/jobs/tests/Jobs/Amqp/BrokerTest.php b/plugins/jobs/tests/Jobs/Amqp/BrokerTest.php new file mode 100644 index 00000000..637c14d6 --- /dev/null +++ b/plugins/jobs/tests/Jobs/Amqp/BrokerTest.php @@ -0,0 +1,20 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Amqp; + +use Spiral\Jobs\Tests\BaseTest; + +class BrokerTest extends BaseTest +{ + public const JOB = Job::class; + public const ERROR_JOB = ErrorJob::class; +} diff --git a/plugins/jobs/tests/Jobs/Amqp/ErrorJob.php b/plugins/jobs/tests/Jobs/Amqp/ErrorJob.php new file mode 100644 index 00000000..82b6e7e0 --- /dev/null +++ b/plugins/jobs/tests/Jobs/Amqp/ErrorJob.php @@ -0,0 +1,22 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Amqp; + +use Spiral\Jobs\JobHandler; + +class ErrorJob extends JobHandler +{ + public function invoke(string $id): void + { + throw new \Error('something is wrong'); + } +} diff --git a/plugins/jobs/tests/Jobs/Amqp/Job.php b/plugins/jobs/tests/Jobs/Amqp/Job.php new file mode 100644 index 00000000..2c6ad819 --- /dev/null +++ b/plugins/jobs/tests/Jobs/Amqp/Job.php @@ -0,0 +1,26 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Amqp; + +use Spiral\Jobs\JobHandler; + +class Job extends JobHandler +{ + public const JOB_FILE = __DIR__ . '/../../local.job'; + + public function invoke(string $id, array $payload): void + { + file_put_contents(self::JOB_FILE, json_encode( + $payload + compact('id') + )); + } +} diff --git a/plugins/jobs/tests/Jobs/BaseTest.php b/plugins/jobs/tests/Jobs/BaseTest.php new file mode 100644 index 00000000..67f280b5 --- /dev/null +++ b/plugins/jobs/tests/Jobs/BaseTest.php @@ -0,0 +1,115 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests; + +use PHPUnit\Framework\TestCase; +use Spiral\Core\Container; +use Spiral\Goridge\RPC; +use Spiral\Goridge\SocketRelay; +use Spiral\Jobs\Options; +use Spiral\Jobs\Queue; +use Spiral\Jobs\Registry\ContainerRegistry; + +abstract class BaseTest extends TestCase +{ + public const JOB = null; + public const ERROR_JOB = null; + + private $job; + private $errorJob; + + public function setUp(): void + { + $this->job = static::JOB; + $this->errorJob = static::ERROR_JOB; + } + + protected function tearDown(): void + { + if (file_exists((static::JOB)::JOB_FILE)) { + unlink((static::JOB)::JOB_FILE); + } + } + + public function testJob(): void + { + $jobs = $this->makeJobs(); + + $id = $jobs->push($this->job, ['data' => 100]); + + $this->assertNotEmpty($id); + + $this->waitForJob(); + $this->assertFileExists($this->job::JOB_FILE); + + $data = json_decode(file_get_contents($this->job::JOB_FILE), true); + $this->assertSame($id, $data['id']); + $this->assertSame(100, $data['data']); + } + + public function testErrorJob(): void + { + $jobs = $this->makeJobs(); + + $id = $jobs->push($this->errorJob, ['data' => 100]); + $this->assertNotEmpty($id); + } + + public function testDelayJob(): void + { + $jobs = $this->makeJobs(); + + $id = $jobs->push($this->job, ['data' => 100], Options::delayed(1)); + + $this->assertNotEmpty($id); + + $this->assertTrue($this->waitForJob() > 1); + $this->assertFileExists($this->job::JOB_FILE); + + $data = json_decode(file_get_contents($this->job::JOB_FILE), true); + $this->assertSame($id, $data['id']); + $this->assertSame(100, $data['data']); + } + + /** + * @expectedException \Spiral\Jobs\Exception\JobException + */ + public function testConnectionException(): void + { + $jobs = new Queue( + new RPC(new SocketRelay('localhost', 6002)), + new ContainerRegistry(new Container()) + ); + + $jobs->push($this->job, ['data' => 100]); + } + + public function makeJobs(): Queue + { + return new Queue( + new RPC(new SocketRelay('localhost', 6001)), + new ContainerRegistry(new Container()) + ); + } + + private function waitForJob(): float + { + $start = microtime(true); + $try = 0; + while (!file_exists($this->job::JOB_FILE) && $try < 10) { + usleep(250000); + $try++; + } + + return microtime(true) - $start; + } +} diff --git a/plugins/jobs/tests/Jobs/Beanstalk/BrokerTest.php b/plugins/jobs/tests/Jobs/Beanstalk/BrokerTest.php new file mode 100644 index 00000000..d1ea4682 --- /dev/null +++ b/plugins/jobs/tests/Jobs/Beanstalk/BrokerTest.php @@ -0,0 +1,20 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Beanstalk; + +use Spiral\Jobs\Tests\BaseTest; + +class BrokerTest extends BaseTest +{ + public const JOB = Job::class; + public const ERROR_JOB = ErrorJob::class; +} diff --git a/plugins/jobs/tests/Jobs/Beanstalk/ErrorJob.php b/plugins/jobs/tests/Jobs/Beanstalk/ErrorJob.php new file mode 100644 index 00000000..c4349871 --- /dev/null +++ b/plugins/jobs/tests/Jobs/Beanstalk/ErrorJob.php @@ -0,0 +1,22 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Beanstalk; + +use Spiral\Jobs\JobHandler; + +class ErrorJob extends JobHandler +{ + public function invoke(string $id): void + { + throw new \Error('something is wrong'); + } +} diff --git a/plugins/jobs/tests/Jobs/Beanstalk/Job.php b/plugins/jobs/tests/Jobs/Beanstalk/Job.php new file mode 100644 index 00000000..f8bd541a --- /dev/null +++ b/plugins/jobs/tests/Jobs/Beanstalk/Job.php @@ -0,0 +1,26 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Beanstalk; + +use Spiral\Jobs\JobHandler; + +class Job extends JobHandler +{ + public const JOB_FILE = __DIR__ . '/../../local.job'; + + public function invoke(string $id, array $payload): void + { + file_put_contents(self::JOB_FILE, json_encode( + $payload + compact('id') + )); + } +} diff --git a/plugins/jobs/tests/Jobs/Local/BrokerTest.php b/plugins/jobs/tests/Jobs/Local/BrokerTest.php new file mode 100644 index 00000000..9ba83de6 --- /dev/null +++ b/plugins/jobs/tests/Jobs/Local/BrokerTest.php @@ -0,0 +1,20 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Local; + +use Spiral\Jobs\Tests\BaseTest; + +class BrokerTest extends BaseTest +{ + public const JOB = Job::class; + public const ERROR_JOB = ErrorJob::class; +} diff --git a/plugins/jobs/tests/Jobs/Local/ErrorJob.php b/plugins/jobs/tests/Jobs/Local/ErrorJob.php new file mode 100644 index 00000000..70b1365b --- /dev/null +++ b/plugins/jobs/tests/Jobs/Local/ErrorJob.php @@ -0,0 +1,22 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Local; + +use Spiral\Jobs\JobHandler; + +class ErrorJob extends JobHandler +{ + public function invoke(string $id): void + { + throw new \Error('something is wrong'); + } +} diff --git a/plugins/jobs/tests/Jobs/Local/Job.php b/plugins/jobs/tests/Jobs/Local/Job.php new file mode 100644 index 00000000..2f5803c8 --- /dev/null +++ b/plugins/jobs/tests/Jobs/Local/Job.php @@ -0,0 +1,26 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Local; + +use Spiral\Jobs\JobHandler; + +class Job extends JobHandler +{ + public const JOB_FILE = __DIR__ . '/../../local.job'; + + public function invoke(string $id, array $payload): void + { + file_put_contents(self::JOB_FILE, json_encode( + $payload + compact('id') + )); + } +} diff --git a/plugins/jobs/tests/Jobs/OptionsTest.php b/plugins/jobs/tests/Jobs/OptionsTest.php new file mode 100644 index 00000000..5d00794e --- /dev/null +++ b/plugins/jobs/tests/Jobs/OptionsTest.php @@ -0,0 +1,34 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests; + +use PHPUnit\Framework\TestCase; +use Spiral\Jobs\Options; + +class OptionsTest extends TestCase +{ + public function testDelay(): void + { + $o = new Options(); + $this->assertNull($o->getDelay()); + $o = $o->withDelay(10); + $this->assertSame(10, $o->getDelay()); + } + + public function testPipeline(): void + { + $o = new Options(); + $this->assertNull($o->getPipeline()); + $o = $o->withPipeline('custom'); + $this->assertSame('custom', $o->getPipeline()); + } +} diff --git a/plugins/jobs/tests/Jobs/RegistryTest.php b/plugins/jobs/tests/Jobs/RegistryTest.php new file mode 100644 index 00000000..7abd75f7 --- /dev/null +++ b/plugins/jobs/tests/Jobs/RegistryTest.php @@ -0,0 +1,43 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests; + +use PHPUnit\Framework\TestCase; +use Spiral\Core\Container; +use Spiral\Jobs\Registry\ContainerRegistry; +use Spiral\Jobs\Tests\Local\Job; + +class RegistryTest extends TestCase +{ + public function testMakeJob(): void + { + $factory = new ContainerRegistry(new Container()); + + $j = $factory->getHandler('spiral.jobs.tests.local.job'); + $this->assertInstanceOf(Job::class, $j); + + $this->assertSame(json_encode(['data' => 200]), $j->serialize( + 'spiral.jobs.tests.local.job', + ['data' => 200] + )); + } + + /** + * @expectedException \Spiral\Jobs\Exception\JobException + */ + public function testMakeUndefined(): void + { + $factory = new ContainerRegistry(new Container()); + + $factory->getHandler('spiral.jobs.undefined'); + } +} diff --git a/plugins/jobs/tests/Jobs/ShortCircuitTest.php b/plugins/jobs/tests/Jobs/ShortCircuitTest.php new file mode 100644 index 00000000..c3306385 --- /dev/null +++ b/plugins/jobs/tests/Jobs/ShortCircuitTest.php @@ -0,0 +1,90 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests; + +use PHPUnit\Framework\TestCase; +use Spiral\Core\Container; +use Spiral\Jobs\Options; +use Spiral\Jobs\Registry\ContainerRegistry; +use Spiral\Jobs\ShortCircuit; +use Spiral\Jobs\Tests\Local\ErrorJob; +use Spiral\Jobs\Tests\Local\Job; + +class ShortCircuitTest extends TestCase +{ + protected function tearDown(): void + { + if (file_exists(Job::JOB_FILE)) { + unlink(Job::JOB_FILE); + } + } + + public function testLocal(): void + { + $c = new ContainerRegistry(new Container()); + $jobs = new ShortCircuit($c, $c); + + $id = $jobs->push(Job::class, ['data' => 100]); + + $this->assertNotEmpty($id); + + $this->assertFileExists(Job::JOB_FILE); + + $data = json_decode(file_get_contents(Job::JOB_FILE), true); + $this->assertSame($id, $data['id']); + $this->assertSame(100, $data['data']); + } + + public function testLocalDelayed(): void + { + $c = new ContainerRegistry(new Container()); + $jobs = new ShortCircuit($c, $c); + + $t = microtime(true); + $id = $jobs->push(Job::class, ['data' => 100], Options::delayed(1)); + + $this->assertTrue(microtime(true) - $t >= 1); + + $this->assertNotEmpty($id); + + $this->assertFileExists(Job::JOB_FILE); + + $data = json_decode(file_get_contents(Job::JOB_FILE), true); + $this->assertSame($id, $data['id']); + $this->assertSame(100, $data['data']); + } + + /** + * @expectedException \Spiral\Jobs\Exception\JobException + */ + public function testError(): void + { + $c = new ContainerRegistry(new Container()); + $jobs = new ShortCircuit($c, $c); + $jobs->push(ErrorJob::class); + } + + public function testLocalDelay(): void + { + $c = new ContainerRegistry(new Container()); + $jobs = new ShortCircuit($c, $c); + + $id = $jobs->push(Job::class, ['data' => 100], Options::delayed(1)); + $this->assertNotEmpty($id); + + $this->assertFileExists(Job::JOB_FILE); + + $data = json_decode(file_get_contents(Job::JOB_FILE), true); + $this->assertSame($id, $data['id']); + $this->assertSame(100, $data['data']); + } +} diff --git a/plugins/jobs/tests/Jobs/Sqs/BrokerTest.php b/plugins/jobs/tests/Jobs/Sqs/BrokerTest.php new file mode 100644 index 00000000..ccaa96de --- /dev/null +++ b/plugins/jobs/tests/Jobs/Sqs/BrokerTest.php @@ -0,0 +1,20 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Sqs; + +use Spiral\Jobs\Tests\BaseTest; + +class BrokerTest extends BaseTest +{ + public const JOB = Job::class; + public const ERROR_JOB = ErrorJob::class; +} diff --git a/plugins/jobs/tests/Jobs/Sqs/ErrorJob.php b/plugins/jobs/tests/Jobs/Sqs/ErrorJob.php new file mode 100644 index 00000000..738b9f2b --- /dev/null +++ b/plugins/jobs/tests/Jobs/Sqs/ErrorJob.php @@ -0,0 +1,22 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Sqs; + +use Spiral\Jobs\JobHandler; + +class ErrorJob extends JobHandler +{ + public function invoke(string $id): void + { + throw new \Error('something is wrong'); + } +} diff --git a/plugins/jobs/tests/Jobs/Sqs/Job.php b/plugins/jobs/tests/Jobs/Sqs/Job.php new file mode 100644 index 00000000..e22483a8 --- /dev/null +++ b/plugins/jobs/tests/Jobs/Sqs/Job.php @@ -0,0 +1,26 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Jobs\Tests\Sqs; + +use Spiral\Jobs\JobHandler; + +class Job extends JobHandler +{ + public const JOB_FILE = __DIR__ . '/../../local.job'; + + public function invoke(string $id, array $payload): void + { + file_put_contents(self::JOB_FILE, json_encode( + $payload + compact('id') + )); + } +} diff --git a/plugins/jobs/tests/bootstrap.php b/plugins/jobs/tests/bootstrap.php new file mode 100644 index 00000000..b25fdc9d --- /dev/null +++ b/plugins/jobs/tests/bootstrap.php @@ -0,0 +1,16 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +error_reporting(E_ALL | E_STRICT); +ini_set('display_errors', 'stderr'); + +//Composer +require dirname(__DIR__) . '/vendor_php/autoload.php'; diff --git a/plugins/jobs/tests/consumer.php b/plugins/jobs/tests/consumer.php new file mode 100644 index 00000000..ed56edff --- /dev/null +++ b/plugins/jobs/tests/consumer.php @@ -0,0 +1,22 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +use Spiral\Core\Container; +use Spiral\Goridge; +use Spiral\Jobs; +use Spiral\RoadRunner; + +require 'bootstrap.php'; + +$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); + +$consumer = new Jobs\Consumer(new Jobs\Registry\ContainerRegistry(new Container())); +$consumer->serve($rr); diff --git a/plugins/jobs/tests/docker-compose.yml b/plugins/jobs/tests/docker-compose.yml new file mode 100644 index 00000000..7b88c9cf --- /dev/null +++ b/plugins/jobs/tests/docker-compose.yml @@ -0,0 +1,22 @@ +version: "3" + +services: + beanstalk: + image: schickling/beanstalkd + ports: + - "11300:11300" + + sqs: + image: vsouza/sqs-local + ports: + - "9324:9324" + + rabbitmq: + image: rabbitmq:3-management + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + RABBITMQ_DEFAULT_VHOST: / + ports: + - "15672:15672" + - "5672:5672"
\ No newline at end of file |