diff options
Diffstat (limited to 'plugins/jobs/oooold/broker')
39 files changed, 7138 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go new file mode 100644 index 00000000..b47d83ee --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/broker.go @@ -0,0 +1,216 @@ +package amqp + +import ( + "fmt" + "github.com/gofrs/uuid" + "github.com/spiral/jobs/v2" + "sync" + "sync/atomic" +) + +// Broker represents AMQP broker. +type Broker struct { + cfg *Config + lsn func(event int, ctx interface{}) + publish *chanPool + consume *chanPool + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures AMQP job broker (always 2 connections). +func (b *Broker) Init(cfg *Config) (ok bool, err error) { + b.cfg = cfg + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + q, err := newQueue(pipe, b.throw) + if err != nil { + return err + } + + b.queues[pipe] = q + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { + b.mu.Unlock() + return err + } + defer b.publish.Close() + + if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil { + b.mu.Unlock() + return err + } + defer b.consume.Close() + + for _, q := range b.queues { + err := q.declare(b.publish, q.name, q.key, nil) + if err != nil { + b.mu.Unlock() + return err + } + } + + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve(b.publish, b.consume) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, q := range b.queues { + q.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.publish != nil && q.execPool != nil { + if q.execPool != nil { + go q.serve(b.publish, b.consume) + } + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil { + return "", err + } + + return id.String(), nil +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + queue, err := q.inspect(b.publish) + if err != nil { + return nil, err + } + + // this the closest approximation we can get for now + return &jobs.Stat{ + InternalName: queue.Name, + Queue: int64(queue.Messages), + Active: int64(atomic.LoadInt32(&q.running)), + }, nil +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go new file mode 100644 index 00000000..66078099 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/broker_test.go @@ -0,0 +1,419 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "amqp", + "name": "default", + "queue": "rr-queue", + "exchange": "rr-exchange", + "prefetch": 1, + } + + cfg = &Config{ + Addr: "amqp://guest:guest@localhost:5672/", + } +) + +var ( + fanoutPipe = &jobs.Pipeline{ + "broker": "amqp", + "name": "fanout", + "queue": "fanout-queue", + "exchange": "fanout-exchange", + "exchange-type": "fanout", + "prefetch": 1, + } + + fanoutCfg = &Config{ + Addr: "amqp://guest:guest@localhost:5672/", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_BadPipeline(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "amqp", + "name": "default", + "exchange": "rr-exchange", + "prefetch": 1, + })) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_CantStart(t *testing.T) { + b := &Broker{} + _, err := b.Init(&Config{ + Addr: "amqp://guest:guest@localhost:15672/", + }) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Serve()) +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal() + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_Queue_RoutingKey(t *testing.T) { + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key") +} + +func TestBroker_Register_With_RoutingKey(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + assert.NoError(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Consume_With_RoutingKey(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key") + + err = b.Register(&pipeWithKey) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(&pipeWithKey, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Queue_ExchangeType(t *testing.T) { + pipeWithKey := pipe.With("exchange-type", "direct") + + assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct") +} + +func TestBroker_Register_With_ExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("exchange-type", "fanout") + + assert.NoError(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Register_With_WrongExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := pipe.With("exchange-type", "xxx") + + assert.Error(t, b.Register(&pipeWithKey)) +} + +func TestBroker_Consume_With_ExchangeType(t *testing.T) { + b := &Broker{} + _, err := b.Init(fanoutCfg) + if err != nil { + t.Fatal(err) + } + + pipeWithKey := fanoutPipe.With("exchange-type", "fanout") + + err = b.Register(&pipeWithKey) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(&pipeWithKey, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} diff --git a/plugins/jobs/oooold/broker/amqp/config.go b/plugins/jobs/oooold/broker/amqp/config.go new file mode 100644 index 00000000..0ed3a50e --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/config.go @@ -0,0 +1,39 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/roadrunner/service" + "time" +) + +// Config defines sqs broker configuration. +type Config struct { + // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/). + Addr string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Addr == "" { + return fmt.Errorf("AMQP address is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to redial +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} diff --git a/plugins/jobs/oooold/broker/amqp/config_test.go b/plugins/jobs/oooold/broker/amqp/config_test.go new file mode 100644 index 00000000..1abbb55d --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/config_test.go @@ -0,0 +1,27 @@ +package amqp + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"addr":""}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go new file mode 100644 index 00000000..be747776 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/conn.go @@ -0,0 +1,232 @@ +package amqp + +import ( + "fmt" + "github.com/cenkalti/backoff/v4" + "github.com/streadway/amqp" + "sync" + "time" +) + +// manages set of AMQP channels +type chanPool struct { + // timeout to backoff redial + tout time.Duration + url string + + mu *sync.Mutex + + conn *amqp.Connection + channels map[string]*channel + wait chan interface{} + connected chan interface{} +} + +// manages single channel +type channel struct { + ch *amqp.Channel + // todo unused + //consumer string + confirm chan amqp.Confirmation + signal chan error +} + +// newConn creates new watched AMQP connection +func newConn(url string, tout time.Duration) (*chanPool, error) { + conn, err := dial(url) + if err != nil { + return nil, err + } + + cp := &chanPool{ + url: url, + tout: tout, + conn: conn, + mu: &sync.Mutex{}, + channels: make(map[string]*channel), + wait: make(chan interface{}), + connected: make(chan interface{}), + } + + close(cp.connected) + go cp.watch() + return cp, nil +} + +// dial dials to AMQP. +func dial(url string) (*amqp.Connection, error) { + return amqp.Dial(url) +} + +// Close gracefully closes all underlying channels and connection. +func (cp *chanPool) Close() error { + cp.mu.Lock() + + close(cp.wait) + if cp.channels == nil { + return fmt.Errorf("connection is dead") + } + + // close all channels and consume + var wg sync.WaitGroup + for _, ch := range cp.channels { + wg.Add(1) + + go func(ch *channel) { + defer wg.Done() + cp.closeChan(ch, nil) + }(ch) + } + cp.mu.Unlock() + + wg.Wait() + + cp.mu.Lock() + defer cp.mu.Unlock() + + if cp.conn != nil { + return cp.conn.Close() + } + + return nil +} + +// waitConnected waits till connection is connected again or eventually closed. +// must only be invoked after connection error has been delivered to channel.signal. +func (cp *chanPool) waitConnected() chan interface{} { + cp.mu.Lock() + defer cp.mu.Unlock() + + return cp.connected +} + +// watch manages connection state and reconnects if needed +func (cp *chanPool) watch() { + for { + select { + case <-cp.wait: + // connection has been closed + return + // here we are waiting for the errors from amqp connection + case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)): + cp.mu.Lock() + // clear connected, since connections are dead + cp.connected = make(chan interface{}) + + // broadcast error to all consume to let them for the tryReconnect + for _, ch := range cp.channels { + ch.signal <- err + } + + // disable channel allocation while server is dead + cp.conn = nil + cp.channels = nil + + // initialize the backoff + expb := backoff.NewExponentialBackOff() + expb.MaxInterval = cp.tout + cp.mu.Unlock() + + // reconnect function + reconnect := func() error { + cp.mu.Lock() + conn, err := dial(cp.url) + if err != nil { + // still failing + fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error())) + cp.mu.Unlock() + return err + } + + // TODO ADD LOGGING + fmt.Println("------amqp successfully redialed------") + + // here we are reconnected + // replace the connection + cp.conn = conn + // re-init the channels + cp.channels = make(map[string]*channel) + cp.mu.Unlock() + return nil + } + + // start backoff retry + errb := backoff.Retry(reconnect, expb) + if errb != nil { + fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error())) + // reconnection failed + close(cp.connected) + return + } + close(cp.connected) + } + } +} + +// channel allocates new channel on amqp connection +func (cp *chanPool) channel(name string) (*channel, error) { + cp.mu.Lock() + dead := cp.conn == nil + cp.mu.Unlock() + + if dead { + // wait for connection restoration (doubled the timeout duration) + select { + case <-time.NewTimer(cp.tout * 2).C: + return nil, fmt.Errorf("connection is dead") + case <-cp.connected: + // connected + } + } + + cp.mu.Lock() + defer cp.mu.Unlock() + + if cp.conn == nil { + return nil, fmt.Errorf("connection has been closed") + } + + if ch, ok := cp.channels[name]; ok { + return ch, nil + } + + // we must create new channel + ch, err := cp.conn.Channel() + if err != nil { + return nil, err + } + + // Enable publish confirmations + if err = ch.Confirm(false); err != nil { + return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err) + } + + // we expect that every allocated channel would have listener on signal + // this is not true only in case of pure producing channels + cp.channels[name] = &channel{ + ch: ch, + confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)), + signal: make(chan error, 1), + } + + return cp.channels[name], nil +} + +// closeChan gracefully closes and removes channel allocation. +func (cp *chanPool) closeChan(c *channel, err error) error { + cp.mu.Lock() + defer cp.mu.Unlock() + + go func() { + c.signal <- nil + c.ch.Close() + }() + + for name, ch := range cp.channels { + if ch == c { + delete(cp.channels, name) + } + } + + return err +} diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go new file mode 100644 index 00000000..28999c36 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/consume_test.go @@ -0,0 +1,258 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + start := time.Now() + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed >= time.Second) + assert.True(t, elapsed < 3*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go new file mode 100644 index 00000000..00d62c51 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/durability_test.go @@ -0,0 +1,728 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Addr: "amqp://guest:guest@localhost:5673/", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:5673", + upstream: "localhost:5672", + accept: true, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + // expect 2 connections + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NotEqual(t, "0", jid) + + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + proxy.reset(true) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2_2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // start when connection is dead + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + proxy.reset(false) + + _, serr := b.Stat(pipe) + assert.Error(t, serr) + + proxy.reset(true) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + if perr != nil { + panic(perr) + } + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + ch, err := b.consume.channel("purger") + if err != nil { + panic(err) + } + _, err = ch.ch.QueuePurge("rr-queue", false) + if err != nil { + panic(err) + } + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + proxy.waitConn(2) + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + + if j.Payload == "kill" && len(done) == 0 { + proxy.reset(true) + } + + mu.Lock() + defer mu.Unlock() + done[id] = true + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 3 { + break + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(2).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go new file mode 100644 index 00000000..bd559715 --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/job.go @@ -0,0 +1,56 @@ +package amqp + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/streadway/amqp" +) + +// pack job metadata into headers +func pack(id string, attempt int, j *jobs.Job) amqp.Table { + return amqp.Table{ + "rr-id": id, + "rr-job": j.Job, + "rr-attempt": int64(attempt), + "rr-maxAttempts": int64(j.Options.Attempts), + "rr-timeout": int64(j.Options.Timeout), + "rr-delay": int64(j.Options.Delay), + "rr-retryDelay": int64(j.Options.RetryDelay), + } +} + +// unpack restores jobs.Options +func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) { + j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}} + + if _, ok := d.Headers["rr-id"].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id") + } + + if _, ok := d.Headers["rr-attempt"].(int64); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt") + } + + if _, ok := d.Headers["rr-job"].(string); !ok { + return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job") + } + j.Job = d.Headers["rr-job"].(string) + + if _, ok := d.Headers["rr-maxAttempts"].(int64); ok { + j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64)) + } + + if _, ok := d.Headers["rr-timeout"].(int64); ok { + j.Options.Timeout = int(d.Headers["rr-timeout"].(int64)) + } + + if _, ok := d.Headers["rr-delay"].(int64); ok { + j.Options.Delay = int(d.Headers["rr-delay"].(int64)) + } + + if _, ok := d.Headers["rr-retryDelay"].(int64); ok { + j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64)) + } + + return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil +} diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go new file mode 100644 index 00000000..24ca453b --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/job_test.go @@ -0,0 +1,29 @@ +package amqp + +import ( + "github.com/streadway/amqp" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Unpack_Errors(t *testing.T) { + _, _, _, err := unpack(amqp.Delivery{ + Headers: map[string]interface{}{}, + }) + assert.Error(t, err) + + _, _, _, err = unpack(amqp.Delivery{ + Headers: map[string]interface{}{ + "rr-id": "id", + }, + }) + assert.Error(t, err) + + _, _, _, err = unpack(amqp.Delivery{ + Headers: map[string]interface{}{ + "rr-id": "id", + "rr-attempt": int64(0), + }, + }) + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go new file mode 100644 index 00000000..6ef5f20f --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/queue.go @@ -0,0 +1,302 @@ +package amqp + +import ( + "errors" + "fmt" + "github.com/spiral/jobs/v2" + "github.com/streadway/amqp" + "os" + "sync" + "sync/atomic" + "time" +) + +type ExchangeType string + +const ( + Direct ExchangeType = "direct" + Fanout ExchangeType = "fanout" + Topic ExchangeType = "topic" + Headers ExchangeType = "headers" +) + +func (et ExchangeType) IsValid() error { + switch et { + case Direct, Fanout, Topic, Headers: + return nil + } + return errors.New("unknown exchange-type") +} + +func (et ExchangeType) String() string { + switch et { + case Direct, Fanout, Topic, Headers: + return string(et) + default: + return "direct" + } +} + + +type queue struct { + active int32 + pipe *jobs.Pipeline + exchange string + exchangeType ExchangeType + name, key string + consumer string + + // active consuming channel + muc sync.Mutex + cc *channel + + // queue events + lsn func(event int, ctx interface{}) + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + running int32 + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +// newQueue creates new queue wrapper for AMQP. +func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { + if pipe.String("queue", "") == "" { + return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline") + } + + exchangeType := ExchangeType(pipe.String("exchange-type", "direct")) + + err := exchangeType.IsValid() + if err != nil { + return nil, fmt.Errorf(err.Error()) + } + + return &queue{ + exchange: pipe.String("exchange", "amqp.direct"), + exchangeType: exchangeType, + name: pipe.String("queue", ""), + key: pipe.String("routing-key", pipe.String("queue", "")), + consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())), + pipe: pipe, + lsn: lsn, + }, nil +} + +// serve consumes queue +func (q *queue) serve(publish, consume *chanPool) { + atomic.StoreInt32(&q.active, 1) + + for { + <-consume.waitConnected() + if atomic.LoadInt32(&q.active) == 0 { + // stopped + return + } + + delivery, cc, err := q.consume(consume) + if err != nil { + q.report(err) + continue + } + + q.muc.Lock() + q.cc = cc + q.muc.Unlock() + + for d := range delivery { + q.muw.Lock() + q.wg.Add(1) + q.muw.Unlock() + + atomic.AddInt32(&q.running, 1) + h := <-q.execPool + + go func(h jobs.Handler, d amqp.Delivery) { + err := q.do(publish, h, d) + + atomic.AddInt32(&q.running, ^int32(0)) + q.execPool <- h + q.wg.Done() + q.report(err) + }(h, d) + } + } +} + +func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) { + // allocate channel for the consuming + if cc, err = consume.channel(q.name); err != nil { + return nil, nil, err + } + + if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil { + return nil, nil, consume.closeChan(cc, err) + } + + delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil) + if err != nil { + return nil, nil, consume.closeChan(cc, err) + } + + // do i like it? + go func(consume *chanPool) { + for err := range cc.signal { + consume.closeChan(cc, err) + return + } + }(consume) + + return delivery, cc, err +} + +func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error { + id, attempt, j, err := unpack(d) + if err != nil { + q.report(err) + return d.Nack(false, false) + } + err = h(id, j) + + if err == nil { + return d.Ack(false) + } + + // failed + q.errHandler(id, j, err) + + if !j.Options.CanRetry(attempt) { + return d.Nack(false, false) + } + + // retry as new j (to accommodate attempt number and new delay) + if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil { + q.report(err) + return d.Nack(false, true) + } + + return d.Ack(false) +} + +func (q *queue) stop() { + if atomic.LoadInt32(&q.active) == 0 { + return + } + + atomic.StoreInt32(&q.active, 0) + + q.muc.Lock() + if q.cc != nil { + // gracefully stopped consuming + q.report(q.cc.ch.Cancel(q.consumer, true)) + } + q.muc.Unlock() + + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() +} + +// publish message to queue or to delayed queue. +func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error { + c, err := cp.channel(q.name) + if err != nil { + return err + } + + qKey := q.key + + if delay != 0 { + delayMs := int64(delay.Seconds() * 1000) + qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name) + qKey = qName + + err := q.declare(cp, qName, qName, amqp.Table{ + "x-dead-letter-exchange": q.exchange, + "x-dead-letter-routing-key": q.name, + "x-message-ttl": delayMs, + "x-expires": delayMs * 2, + }) + + if err != nil { + return err + } + } + + err = c.ch.Publish( + q.exchange, // exchange + qKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/octet-stream", + Body: j.Body(), + DeliveryMode: amqp.Persistent, + Headers: pack(id, attempt, j), + }, + ) + + if err != nil { + return cp.closeChan(c, err) + } + + confirmed, ok := <-c.confirm + if ok && confirmed.Ack { + return nil + } + + return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag) +} + +// declare queue and binding to it +func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error { + c, err := cp.channel(q.name) + if err != nil { + return err + } + + err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil) + if err != nil { + return cp.closeChan(c, err) + } + + _, err = c.ch.QueueDeclare(queue, true, false, false, false, args) + if err != nil { + return cp.closeChan(c, err) + } + + err = c.ch.QueueBind(queue, key, q.exchange, false, nil) + if err != nil { + return cp.closeChan(c, err) + } + + // keep channel open + return err +} + +// inspect the queue +func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) { + c, err := cp.channel("stat") + if err != nil { + return nil, err + } + + queue, err := c.ch.QueueInspect(q.name) + if err != nil { + return nil, cp.closeChan(c, err) + } + + // keep channel open + return &queue, err +} + +// throw handles service, server and pool events. +func (q *queue) report(err error) { + if err != nil { + q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go new file mode 100644 index 00000000..ef19746c --- /dev/null +++ b/plugins/jobs/oooold/broker/amqp/stat_test.go @@ -0,0 +1,63 @@ +package amqp + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + wg := &sync.WaitGroup{} + wg.Add(1) + exec <- func(id string, j *jobs.Job) error { + defer wg.Done() + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Active) + + return nil + } + + wg.Wait() + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(0), stat.Active) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go new file mode 100644 index 00000000..dc3ea518 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/broker.go @@ -0,0 +1,185 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker run consume using Broker service. +type Broker struct { + cfg *Config + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + conn *conn + tubes map[*jobs.Pipeline]*tube +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures broker. +func (b *Broker) Init(cfg *Config) (bool, error) { + b.cfg = cfg + b.tubes = make(map[*jobs.Pipeline]*tube) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.tubes[pipe]; ok { + return fmt.Errorf("tube `%s` has already been registered", pipe.Name()) + } + + t, err := newTube(pipe, b.throw) + if err != nil { + return err + } + + b.tubes[pipe] = t + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + if b.conn, err = b.cfg.newConn(); err != nil { + return err + } + defer b.conn.Close() + + for _, t := range b.tubes { + tt := t + if tt.execPool != nil { + go tt.serve(b.cfg) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, t := range b.tubes { + t.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + t, ok := b.tubes[pipe] + if !ok { + return fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + t.stop() + + t.execPool = execPool + t.errHandler = errHandler + + if b.conn != nil { + tt := t + if tt.execPool != nil { + go tt.serve(connFactory(b.cfg)) + } + } + + return nil +} + +// Push data into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + t := b.tube(pipe) + if t == nil { + return "", fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + data, err := pack(j) + if err != nil { + return "", err + } + + return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration()) +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + t := b.tube(pipe) + if t == nil { + return nil, fmt.Errorf("undefined tube `%s`", pipe.Name()) + } + + return t.stat(b.conn) +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) tube(pipe *jobs.Pipeline) *tube { + b.mu.Lock() + defer b.mu.Unlock() + + t, ok := b.tubes[pipe] + if !ok { + return nil + } + + return t +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go new file mode 100644 index 00000000..cd2132af --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/broker_test.go @@ -0,0 +1,276 @@ +package beanstalk + +import ( + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "beanstalk", + "name": "default", + "tube": "test", + } + + cfg = &Config{ + Addr: "tcp://localhost:11300", + } +) + +func init() { + conn, err := beanstalk.Dial("tcp", "localhost:11300") + if err != nil { + panic(err) + } + defer conn.Close() + + t := beanstalk.Tube{Name: "testTube", Conn: conn} + + for { + id, _, err := t.PeekReady() + if id == 0 || err != nil { + break + } + + if err := conn.Delete(id); err != nil { + panic(err) + } + } +} + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Register_Invalid(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "beanstalk", + "name": "default", + })) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Error(t *testing.T) { + b := &Broker{} + _, err := b.Init(&Config{ + Addr: "tcp://localhost:11399", + }) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Serve()) +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go new file mode 100644 index 00000000..3e48a2d7 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/config.go @@ -0,0 +1,50 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/roadrunner/service" + "strings" + "time" +) + +// Config defines beanstalk broker configuration. +type Config struct { + // Addr of beanstalk server. + Addr string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Addr == "" { + return fmt.Errorf("beanstalk address is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to allocate the connection. +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} + +// size creates new rpc socket Listener. +func (c *Config) newConn() (*conn, error) { + dsn := strings.Split(c.Addr, "://") + if len(dsn) != 2 { + return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)") + } + + return newConn(dsn[0], dsn[1], c.TimeoutDuration()) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go new file mode 100644 index 00000000..4ba08a04 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/config_test.go @@ -0,0 +1,47 @@ +package beanstalk + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func TestConfig_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func TestConfig_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"addr":""}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func TestConfig_Hydrate_Error3(t *testing.T) { + cfg := &mockCfg{`{"addr":"tcp"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + _, err := c.newConn() + assert.Error(t, err) +} + +func TestConfig_Hydrate_Error4(t *testing.T) { + cfg := &mockCfg{`{"addr":"unix://sock.bean"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + _, err := c.newConn() + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go new file mode 100644 index 00000000..7aba6bbb --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/conn.go @@ -0,0 +1,180 @@ +package beanstalk + +import ( + "fmt" + "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" + "strings" + "sync" + "time" +) + +var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} + +// creates new connections +type connFactory interface { + newConn() (*conn, error) +} + +// conn protects allocation for one connection between +// threads and provides reconnecting capabilities. +type conn struct { + tout time.Duration + conn *beanstalk.Conn + alive bool + free chan interface{} + dead chan interface{} + stop chan interface{} + lock *sync.Cond +} + +// creates new beanstalk connection and reconnect watcher. +func newConn(network, addr string, tout time.Duration) (cn *conn, err error) { + cn = &conn{ + tout: tout, + alive: true, + free: make(chan interface{}, 1), + dead: make(chan interface{}, 1), + stop: make(chan interface{}), + lock: sync.NewCond(&sync.Mutex{}), + } + + cn.conn, err = beanstalk.Dial(network, addr) + if err != nil { + return nil, err + } + + go cn.watch(network, addr) + + return cn, nil +} + +// reset the connection and reconnect watcher. +func (cn *conn) Close() error { + cn.lock.L.Lock() + defer cn.lock.L.Unlock() + + close(cn.stop) + for cn.alive { + cn.lock.Wait() + } + + return nil +} + +// acquire connection instance or return error in case of timeout. When mandratory set to true +// timeout won't be applied. +func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) { + // do not apply timeout on mandatory connections + if mandatory { + select { + case <-cn.stop: + return nil, fmt.Errorf("connection closed") + case <-cn.free: + return cn.conn, nil + } + } + + select { + case <-cn.stop: + return nil, fmt.Errorf("connection closed") + case <-cn.free: + return cn.conn, nil + default: + // *2 to handle commands called right after the connection reset + tout := time.NewTimer(cn.tout * 2) + select { + case <-cn.stop: + tout.Stop() + return nil, fmt.Errorf("connection closed") + case <-cn.free: + tout.Stop() + return cn.conn, nil + case <-tout.C: + return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout) + } + } +} + +// release acquired connection. +func (cn *conn) release(err error) error { + if isConnError(err) { + // reconnect is required + cn.dead <- err + } else { + cn.free <- nil + } + + return err +} + +// watch and reconnect if dead +func (cn *conn) watch(network, addr string) { + cn.free <- nil + t := time.NewTicker(WatchThrottleLimit) + defer t.Stop() + for { + select { + case <-cn.dead: + // simple throttle limiter + <-t.C + // try to reconnect + // TODO add logging here + expb := backoff.NewExponentialBackOff() + expb.MaxInterval = cn.tout + + reconnect := func() error { + conn, err := beanstalk.Dial(network, addr) + if err != nil { + fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error())) + return err + } + + // TODO ADD LOGGING + fmt.Println("------beanstalk successfully redialed------") + + cn.conn = conn + cn.free <- nil + return nil + } + + err := backoff.Retry(reconnect, expb) + if err != nil { + fmt.Println(fmt.Sprintf("redial failed: %s", err.Error())) + cn.dead <- nil + } + + case <-cn.stop: + cn.lock.L.Lock() + select { + case <-cn.dead: + case <-cn.free: + } + + // stop underlying connection + cn.conn.Close() + cn.alive = false + cn.lock.Signal() + + cn.lock.L.Unlock() + + return + } + } +} + +// isConnError indicates that error is related to dead socket. +func isConnError(err error) bool { + if err == nil { + return false + } + + for _, errStr := range connErrors { + // golang... + if strings.Contains(err.Error(), errStr) { + return true + } + } + + return false +} diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go new file mode 100644 index 00000000..84be305e --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/constants.go @@ -0,0 +1,6 @@ +package beanstalk + +import "time" + +// WatchThrottleLimit is used to limit reconnection occurrence in watch function +const WatchThrottleLimit = time.Second
\ No newline at end of file diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go new file mode 100644 index 00000000..b16866ae --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/consume_test.go @@ -0,0 +1,242 @@ +package beanstalk + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed >= time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go new file mode 100644 index 00000000..499a5206 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/durability_test.go @@ -0,0 +1,575 @@ +package beanstalk + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Addr: "tcp://localhost:11301", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:11301", + upstream: "localhost:11300", + accept: true, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + // expect 2 connections + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // reoccuring + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NotEqual(t, "0", jid) + + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2).reset(false) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + proxy.reset(true) + + // auto-reconnect + _, serr = b.Stat(pipe) + assert.NoError(t, serr) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2) + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(2) + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + if err != nil { + t.Fatal(err) + } + + st, serr := b.Stat(pipe) + assert.NoError(t, serr) + assert.Equal(t, int64(3), st.Queue+st.Active) + + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + done[id] = true + if j.Payload == "kill" { + proxy.reset(true) + } + + return nil + } + + for { + st, err := b.Stat(pipe) + if err != nil { + continue + } + + // wait till pipeline is empty + if st.Queue+st.Active == 0 { + return + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(2).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go new file mode 100644 index 00000000..fd9c8c3c --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/job.go @@ -0,0 +1,24 @@ +package beanstalk + +import ( + "bytes" + "encoding/gob" + "github.com/spiral/jobs/v2" +) + +func pack(j *jobs.Job) ([]byte, error) { + b := new(bytes.Buffer) + err := gob.NewEncoder(b).Encode(j) + if err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +func unpack(data []byte) (*jobs.Job, error) { + j := &jobs.Job{} + err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j) + + return j, err +} diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/sock.bean diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go new file mode 100644 index 00000000..14a55859 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/stat_test.go @@ -0,0 +1,66 @@ +package beanstalk + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + b.Register(pipe) + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + // beanstalk reserves job right after push + time.Sleep(time.Millisecond * 100) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(1), stat.Active) + + close(waitJob) + return nil + } + + <-waitJob + + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) +} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go new file mode 100644 index 00000000..9d7ad117 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/tube.go @@ -0,0 +1,250 @@ +package beanstalk + +import ( + "fmt" + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/jobs/v2" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type tube struct { + active int32 + pipe *jobs.Pipeline + mut sync.Mutex + tube *beanstalk.Tube + tubeSet *beanstalk.TubeSet + reserve time.Duration + + // tube events + lsn func(event int, ctx interface{}) + + // stop channel + wait chan interface{} + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +type entry struct { + id uint64 + data []byte +} + +func (e *entry) String() string { + return fmt.Sprintf("%v", e.id) +} + +// create new tube consumer and producer +func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) { + if pipe.String("tube", "") == "" { + return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline") + } + + return &tube{ + pipe: pipe, + tube: &beanstalk.Tube{Name: pipe.String("tube", "")}, + tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")), + reserve: pipe.Duration("reserve", time.Second), + lsn: lsn, + }, nil +} + +// run consumers +func (t *tube) serve(connector connFactory) { + // tube specific consume connection + cn, err := connector.newConn() + if err != nil { + t.report(err) + return + } + defer cn.Close() + + t.wait = make(chan interface{}) + atomic.StoreInt32(&t.active, 1) + + for { + e, err := t.consume(cn) + if err != nil { + if isConnError(err) { + t.report(err) + } + continue + } + + if e == nil { + return + } + + h := <-t.execPool + go func(h jobs.Handler, e *entry) { + err := t.do(cn, h, e) + t.execPool <- h + t.wg.Done() + t.report(err) + }(h, e) + } +} + +// fetch consume +func (t *tube) consume(cn *conn) (*entry, error) { + t.muw.Lock() + defer t.muw.Unlock() + + select { + case <-t.wait: + return nil, nil + default: + conn, err := cn.acquire(false) + if err != nil { + return nil, err + } + + t.tubeSet.Conn = conn + + id, data, err := t.tubeSet.Reserve(t.reserve) + cn.release(err) + + if err != nil { + return nil, err + } + + t.wg.Add(1) + return &entry{id: id, data: data}, nil + } +} + +// do data +func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error { + j, err := unpack(e.data) + if err != nil { + return err + } + + err = h(e.String(), j) + + // mandatory acquisition + conn, connErr := cn.acquire(true) + if connErr != nil { + // possible if server is dead + return connErr + } + + if err == nil { + return cn.release(conn.Delete(e.id)) + } + + stat, statErr := conn.StatsJob(e.id) + if statErr != nil { + return cn.release(statErr) + } + + t.errHandler(e.String(), j, err) + + reserves, ok := strconv.Atoi(stat["reserves"]) + if ok != nil || !j.Options.CanRetry(reserves-1) { + return cn.release(conn.Bury(e.id, 0)) + } + + return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration())) +} + +// stop tube consuming +func (t *tube) stop() { + if atomic.LoadInt32(&t.active) == 0 { + return + } + + atomic.StoreInt32(&t.active, 0) + + close(t.wait) + + t.muw.Lock() + t.wg.Wait() + t.muw.Unlock() +} + +// put data into pool or return error (no wait), this method will try to reattempt operation if +// dead conn found. +func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { + id, err = t.doPut(cn, attempt, data, delay, rrt) + if err != nil && isConnError(err) { + return t.doPut(cn, attempt, data, delay, rrt) + } + + return id, err +} + +// perform put operation +func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) { + conn, err := cn.acquire(false) + if err != nil { + return "", err + } + + var bid uint64 + + t.mut.Lock() + t.tube.Conn = conn + bid, err = t.tube.Put(data, 0, delay, rrt) + t.mut.Unlock() + + return strconv.FormatUint(bid, 10), cn.release(err) +} + +// return tube stats (retries) +func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) { + stat, err = t.doStat(cn) + if err != nil && isConnError(err) { + return t.doStat(cn) + } + + return stat, err +} + +// return tube stats +func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) { + conn, err := cn.acquire(false) + if err != nil { + return nil, err + } + + t.mut.Lock() + t.tube.Conn = conn + values, err := t.tube.Stats() + t.mut.Unlock() + + if err != nil { + return nil, cn.release(err) + } + + stat = &jobs.Stat{InternalName: t.tube.Name} + + if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil { + stat.Queue = int64(v) + } + + if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil { + stat.Active = int64(v) + } + + if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil { + stat.Delayed = int64(v) + } + + return stat, cn.release(nil) +} + +// report tube specific error +func (t *tube) report(err error) { + if err != nil { + t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go new file mode 100644 index 00000000..b6a646f4 --- /dev/null +++ b/plugins/jobs/oooold/broker/beanstalk/tube_test.go @@ -0,0 +1,18 @@ +package beanstalk + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestTube_CantServe(t *testing.T) { + var gctx interface{} + tube := &tube{ + lsn: func(event int, ctx interface{}) { + gctx = ctx + }, + } + + tube.serve(&Config{Addr: "broken"}) + assert.Error(t, gctx.(error)) +} diff --git a/plugins/jobs/oooold/broker/ephemeral/broker.go b/plugins/jobs/oooold/broker/ephemeral/broker.go new file mode 100644 index 00000000..385bb175 --- /dev/null +++ b/plugins/jobs/oooold/broker/ephemeral/broker.go @@ -0,0 +1,174 @@ +package ephemeral + +import ( + "fmt" + "github.com/gofrs/uuid" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker run queue using local goroutines. +type Broker struct { + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures broker. +func (b *Broker) Init() (bool, error) { + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0)) + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() error { + // start consuming + b.mu.Lock() + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve() + } + } + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + // stop all consuming + for _, q := range b.queues { + q.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.wait != nil { + if q.execPool != nil { + go q.serve() + } + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + q.push(id.String(), j, 0, j.Options.DelayDuration()) + + return id.String(), nil +} + +// Stat must consume statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + return q.stat(), nil +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/oooold/broker/ephemeral/broker_test.go b/plugins/jobs/oooold/broker/ephemeral/broker_test.go new file mode 100644 index 00000000..c1b40276 --- /dev/null +++ b/plugins/jobs/oooold/broker/ephemeral/broker_test.go @@ -0,0 +1,221 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "local", + "name": "default", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init() + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + b.Init() + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/ephemeral/consume_test.go b/plugins/jobs/oooold/broker/ephemeral/consume_test.go new file mode 100644 index 00000000..d764a984 --- /dev/null +++ b/plugins/jobs/oooold/broker/ephemeral/consume_test.go @@ -0,0 +1,253 @@ +package ephemeral + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed > time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go new file mode 100644 index 00000000..a24bc216 --- /dev/null +++ b/plugins/jobs/oooold/broker/ephemeral/queue.go @@ -0,0 +1,161 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "sync" + "sync/atomic" + "time" +) + +type queue struct { + on int32 + state *jobs.Stat + + // job pipeline + concurPool chan interface{} + jobs chan *entry + + // on operations + muw sync.Mutex + wg sync.WaitGroup + + // stop channel + wait chan interface{} + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +type entry struct { + id string + job *jobs.Job + attempt int +} + +// create new queue +func newQueue(maxConcur int) *queue { + q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)} + + if maxConcur != 0 { + q.concurPool = make(chan interface{}, maxConcur) + for i := 0; i < maxConcur; i++ { + q.concurPool <- nil + } + } + + return q +} + +// serve consumers +func (q *queue) serve() { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.on, 1) + + for { + e := q.consume() + if e == nil { + q.wg.Wait() + return + } + + if q.concurPool != nil { + <-q.concurPool + } + + atomic.AddInt64(&q.state.Active, 1) + h := <-q.execPool + + go func(h jobs.Handler, e *entry) { + defer q.wg.Done() + + q.do(h, e) + atomic.AddInt64(&q.state.Active, ^int64(0)) + + q.execPool <- h + + if q.concurPool != nil { + q.concurPool <- nil + } + }(h, e) + } +} + +// allocate one job entry +func (q *queue) consume() *entry { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil + case e := <-q.jobs: + q.wg.Add(1) + + return e + } +} + +// do singe job +func (q *queue) do(h jobs.Handler, e *entry) { + err := h(e.id, e.job) + + if err == nil { + atomic.AddInt64(&q.state.Queue, ^int64(0)) + return + } + + q.errHandler(e.id, e.job, err) + + if !e.job.Options.CanRetry(e.attempt) { + atomic.AddInt64(&q.state.Queue, ^int64(0)) + return + } + + q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) +} + +// stop the queue consuming +func (q *queue) stop() { + if atomic.LoadInt32(&q.on) == 0 { + return + } + + close(q.wait) + + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() + + atomic.StoreInt32(&q.on, 0) +} + +// add job to the queue +func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) { + if delay == 0 { + atomic.AddInt64(&q.state.Queue, 1) + go func() { + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() + + return + } + + atomic.AddInt64(&q.state.Delayed, 1) + go func() { + time.Sleep(delay) + atomic.AddInt64(&q.state.Delayed, ^int64(0)) + atomic.AddInt64(&q.state.Queue, 1) + + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() +} + +func (q *queue) stat() *jobs.Stat { + return &jobs.Stat{ + InternalName: ":memory:", + Queue: atomic.LoadInt64(&q.state.Queue), + Active: atomic.LoadInt64(&q.state.Active), + Delayed: atomic.LoadInt64(&q.state.Delayed), + } +} diff --git a/plugins/jobs/oooold/broker/ephemeral/stat_test.go b/plugins/jobs/oooold/broker/ephemeral/stat_test.go new file mode 100644 index 00000000..0894323c --- /dev/null +++ b/plugins/jobs/oooold/broker/ephemeral/stat_test.go @@ -0,0 +1,64 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Active) + + close(waitJob) + return nil + } + + <-waitJob + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(0), stat.Active) +} diff --git a/plugins/jobs/oooold/broker/sqs/broker.go b/plugins/jobs/oooold/broker/sqs/broker.go new file mode 100644 index 00000000..8cc62b6b --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/broker.go @@ -0,0 +1,189 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker represents SQS broker. +type Broker struct { + cfg *Config + sqs *sqs.SQS + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures SQS broker. +func (b *Broker) Init(cfg *Config) (ok bool, err error) { + b.cfg = cfg + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + q, err := newQueue(pipe, b.throw) + if err != nil { + return err + } + + b.queues[pipe] = q + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() (err error) { + b.mu.Lock() + + b.sqs, err = b.cfg.SQS() + if err != nil { + return err + } + + for _, q := range b.queues { + q.url, err = q.declareQueue(b.sqs) + if err != nil { + return err + } + } + + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve(b.sqs, b.cfg.TimeoutDuration()) + } + } + + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + for _, q := range b.queues { + q.stop() + } + + b.wait <- nil + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.sqs != nil && q.execPool != nil { + go q.serve(b.sqs, b.cfg.TimeoutDuration()) + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + if j.Options.Delay > 900 || j.Options.RetryDelay > 900 { + return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name()) + } + + return q.send(b.sqs, j) +} + +// Stat must fetch statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + return q.stat(b.sqs) +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/oooold/broker/sqs/broker_test.go b/plugins/jobs/oooold/broker/sqs/broker_test.go new file mode 100644 index 00000000..c87b302d --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/broker_test.go @@ -0,0 +1,275 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + "declare": map[string]interface{}{ + "MessageRetentionPeriod": 86400, + }, + } + + cfg = &Config{ + Key: "api-key", + Secret: "api-secret", + Region: "us-west-1", + Endpoint: "http://localhost:9324", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_RegisterInvalid(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.Error(t, b.Register(&jobs.Pipeline{ + "broker": "sqs", + "name": "default", + })) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + b.Consume(pipe, exec, errf) + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) { + pipe := &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "invalid", + "declare": map[string]interface{}{ + "VisibilityTimeout": "invalid", + }, + } + + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + b.Consume(pipe, exec, errf) + + assert.Error(t, b.Serve()) +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/sqs/config.go b/plugins/jobs/oooold/broker/sqs/config.go new file mode 100644 index 00000000..d0c2f2b2 --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/config.go @@ -0,0 +1,82 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/roadrunner/service" + "time" +) + +// Config defines sqs broker configuration. +type Config struct { + // Region defined SQS region, not required when endpoint is not empty. + Region string + + // Region defined AWS API key, not required when endpoint is not empty. + Key string + + // Region defined AWS API secret, not required when endpoint is not empty. + Secret string + + // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development. + Endpoint string + + // Timeout to allocate the connection. Default 10 seconds. + Timeout int +} + +// Hydrate config values. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.Region == "" { + return fmt.Errorf("SQS region is missing") + } + + if c.Key == "" { + return fmt.Errorf("SQS key is missing") + } + + if c.Secret == "" { + return fmt.Errorf("SQS secret is missing") + } + + return nil +} + +// TimeoutDuration returns number of seconds allowed to allocate the connection. +func (c *Config) TimeoutDuration() time.Duration { + timeout := c.Timeout + if timeout == 0 { + timeout = 10 + } + + return time.Duration(timeout) * time.Second +} + +// Session returns new AWS session. +func (c *Config) Session() (*session.Session, error) { + return session.NewSession(&aws.Config{ + Region: aws.String(c.Region), + Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""), + }) +} + +// SQS returns new SQS instance or error. +func (c *Config) SQS() (*sqs.SQS, error) { + sess, err := c.Session() + if err != nil { + return nil, err + } + + if c.Endpoint == "" { + return sqs.New(sess), nil + } + + return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil +} diff --git a/plugins/jobs/oooold/broker/sqs/config_test.go b/plugins/jobs/oooold/broker/sqs/config_test.go new file mode 100644 index 00000000..b36b3c6f --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/config_test.go @@ -0,0 +1,48 @@ +package sqs + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error3(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error4(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error5(t *testing.T) { + cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} diff --git a/plugins/jobs/oooold/broker/sqs/consume_test.go b/plugins/jobs/oooold/broker/sqs/consume_test.go new file mode 100644 index 00000000..434fc6ea --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/consume_test.go @@ -0,0 +1,370 @@ +package sqs + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) { + pipe := &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + } + + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_PushTooBigDelay(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{ + Delay: 901, + }, + }) + + assert.Error(t, perr) +} + +func TestBroker_Consume_PushTooBigDelay2(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{ + RetryDelay: 901, + }, + }) + + assert.Error(t, perr) +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed > time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled + b.Stop() +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/oooold/broker/sqs/durability_test.go b/plugins/jobs/oooold/broker/sqs/durability_test.go new file mode 100644 index 00000000..58ddf4b9 --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/durability_test.go @@ -0,0 +1,588 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "io" + "net" + "sync" + "testing" + "time" +) + +var ( + proxyCfg = &Config{ + Key: "api-key", + Secret: "api-secret", + Region: "us-west-1", + Endpoint: "http://localhost:9325", + Timeout: 1, + } + + proxy = &tcpProxy{ + listen: "localhost:9325", + upstream: "localhost:9324", + accept: true, + } + + proxyPipe = &jobs.Pipeline{ + "broker": "sqs", + "name": "default", + "queue": "test", + "lockReserved": 1, + "declare": map[string]interface{}{ + "MessageRetentionPeriod": 86400, + }, + } +) + +type tcpProxy struct { + listen string + upstream string + mu sync.Mutex + accept bool + conn []net.Conn +} + +func (p *tcpProxy) serve() { + l, err := net.Listen("tcp", p.listen) + if err != nil { + panic(err) + } + + for { + in, err := l.Accept() + if err != nil { + panic(err) + } + + if !p.accepting() { + in.Close() + } + + up, err := net.Dial("tcp", p.upstream) + if err != nil { + panic(err) + } + + go io.Copy(in, up) + go io.Copy(up, in) + + p.mu.Lock() + p.conn = append(p.conn, in, up) + p.mu.Unlock() + } +} + +// wait for specific number of connections +func (p *tcpProxy) waitConn(count int) *tcpProxy { + p.mu.Lock() + p.accept = true + p.mu.Unlock() + + for { + p.mu.Lock() + current := len(p.conn) + p.mu.Unlock() + + if current >= count*2 { + break + } + + time.Sleep(time.Millisecond) + } + + return p +} + +func (p *tcpProxy) reset(accept bool) int { + p.mu.Lock() + p.accept = accept + defer p.mu.Unlock() + + count := 0 + for _, conn := range p.conn { + conn.Close() + count++ + } + + p.conn = nil + return count / 2 +} + +func (p *tcpProxy) accepting() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.accept +} + +func init() { + go proxy.serve() +} + +func TestBroker_Durability_Base(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + // expect 2 connections + proxy.waitConn(1) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Durability_Consume(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(1) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume_LongTimeout(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + time.Sleep(3 * time.Second) + proxy.waitConn(1) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume2(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1).reset(false) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.Error(t, perr) + + // restore + proxy.waitConn(2) + + jid, perr = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + proxy.reset(false) + + _, serr = b.Stat(proxyPipe) + assert.Error(t, serr) + + proxy.reset(true) + + _, serr = b.Stat(proxyPipe) + assert.NoError(t, serr) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume3(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1) + + jid, perr := b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(1), st.Queue+st.Active) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 1 { + break + } + } +} + +func TestBroker_Durability_Consume4(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + proxy.waitConn(1) + + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "kill", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(proxyPipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Timeout: 2}, + }) + if err != nil { + t.Fatal(err) + } + + st, serr := b.Stat(proxyPipe) + assert.NoError(t, serr) + assert.Equal(t, int64(3), st.Queue+st.Active) + + mu := sync.Mutex{} + done := make(map[string]bool) + exec <- func(id string, j *jobs.Job) error { + mu.Lock() + defer mu.Unlock() + done[id] = true + + if j.Payload == "kill" { + proxy.reset(true) + } + + return nil + } + + for { + mu.Lock() + num := len(done) + mu.Unlock() + + if num >= 3 { + break + } + } +} + +func TestBroker_Durability_StopDead(t *testing.T) { + defer proxy.reset(true) + + b := &Broker{} + _, err := b.Init(proxyCfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(proxyPipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + + <-ready + + proxy.waitConn(1).reset(false) + + b.Stop() +} diff --git a/plugins/jobs/oooold/broker/sqs/job.go b/plugins/jobs/oooold/broker/sqs/job.go new file mode 100644 index 00000000..50e2c164 --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/job.go @@ -0,0 +1,80 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "strconv" + "time" +) + +var jobAttributes = []*string{ + aws.String("rr-job"), + aws.String("rr-maxAttempts"), + aws.String("rr-delay"), + aws.String("rr-timeout"), + aws.String("rr-retryDelay"), +} + +// pack job metadata into headers +func pack(url *string, j *jobs.Job) *sqs.SendMessageInput { + return &sqs.SendMessageInput{ + QueueUrl: url, + DelaySeconds: aws.Int64(int64(j.Options.Delay)), + MessageBody: aws.String(j.Payload), + MessageAttributes: map[string]*sqs.MessageAttributeValue{ + "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)}, + "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)}, + "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())}, + "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())}, + "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())}, + }, + } +} + +// unpack restores jobs.Options +func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) { + if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok { + return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount") + } + attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"]) + + for _, attr := range jobAttributes { + if _, ok := msg.MessageAttributes[*attr]; !ok { + return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr) + } + } + + j = &jobs.Job{ + Job: *msg.MessageAttributes["rr-job"].StringValue, + Payload: *msg.Body, + Options: &jobs.Options{}, + } + + if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil { + j.Options.Delay = delay + } + + if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil { + j.Options.Attempts = maxAttempts + } + + if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil { + j.Options.Timeout = timeout + } + + if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil { + j.Options.RetryDelay = retryDelay + } + + return *msg.MessageId, attempt - 1, j, nil +} + +func awsString(n int) *string { + return aws.String(strconv.Itoa(n)) +} + +func awsDuration(d time.Duration) *string { + return aws.String(strconv.Itoa(int(d.Seconds()))) +} diff --git a/plugins/jobs/oooold/broker/sqs/job_test.go b/plugins/jobs/oooold/broker/sqs/job_test.go new file mode 100644 index 00000000..a120af53 --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/job_test.go @@ -0,0 +1,19 @@ +package sqs + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_Unpack(t *testing.T) { + msg := &sqs.Message{ + Body: aws.String("body"), + Attributes: map[string]*string{}, + MessageAttributes: map[string]*sqs.MessageAttributeValue{}, + } + + _, _, _, err := unpack(msg) + assert.Error(t, err) +} diff --git a/plugins/jobs/oooold/broker/sqs/queue.go b/plugins/jobs/oooold/broker/sqs/queue.go new file mode 100644 index 00000000..8a92448e --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/queue.go @@ -0,0 +1,266 @@ +package sqs + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/spiral/jobs/v2" + "strconv" + "sync" + "sync/atomic" + "time" +) + +type queue struct { + active int32 + pipe *jobs.Pipeline + url *string + reserve time.Duration + lockReserved time.Duration + + // queue events + lsn func(event int, ctx interface{}) + + // stop channel + wait chan interface{} + + // active operations + muw sync.RWMutex + wg sync.WaitGroup + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) { + if pipe.String("queue", "") == "" { + return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name()) + } + + return &queue{ + pipe: pipe, + reserve: pipe.Duration("reserve", time.Second), + lockReserved: pipe.Duration("lockReserved", 300*time.Second), + lsn: lsn, + }, nil +} + +// declareQueue declared queue +func (q *queue) declareQueue(s *sqs.SQS) (*string, error) { + attr := make(map[string]*string) + for k, v := range q.pipe.Map("declare") { + if vs, ok := v.(string); ok { + attr[k] = aws.String(vs) + } + + if vi, ok := v.(int); ok { + attr[k] = aws.String(strconv.Itoa(vi)) + } + + if vb, ok := v.(bool); ok { + if vb { + attr[k] = aws.String("true") + } else { + attr[k] = aws.String("false") + } + } + } + + if len(attr) != 0 { + r, err := s.CreateQueue(&sqs.CreateQueueInput{ + QueueName: aws.String(q.pipe.String("queue", "")), + Attributes: attr, + }) + + return r.QueueUrl, err + } + + // no need to create (get existed) + r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))}) + if err != nil { + return nil, err + } + + return r.QueueUrl, nil +} + +// serve consumers +func (q *queue) serve(s *sqs.SQS, tout time.Duration) { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.active, 1) + + var errored bool + for { + messages, stop, err := q.consume(s) + if err != nil { + if errored { + // reoccurring error + time.Sleep(tout) + } else { + errored = true + q.report(err) + } + + continue + } + errored = false + + if stop { + return + } + + for _, msg := range messages { + h := <-q.execPool + go func(h jobs.Handler, msg *sqs.Message) { + err := q.do(s, h, msg) + q.execPool <- h + q.wg.Done() + q.report(err) + }(h, msg) + } + } +} + +// consume and allocate connection. +func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil, true, nil + default: + r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: q.url, + MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))), + WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())), + VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())), + AttributeNames: []*string{aws.String("ApproximateReceiveCount")}, + MessageAttributeNames: jobAttributes, + }) + if err != nil { + return nil, false, err + } + + q.wg.Add(len(r.Messages)) + + return r.Messages, false, nil + } +} + +// do single message +func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) { + id, attempt, j, err := unpack(msg) + if err != nil { + go q.deleteMessage(s, msg, err) + return err + } + + // block the job based on known timeout + _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + QueueUrl: q.url, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())), + }) + if err != nil { + go q.deleteMessage(s, msg, err) + return err + } + + err = h(id, j) + if err == nil { + return q.deleteMessage(s, msg, nil) + } + + q.errHandler(id, j, err) + + if !j.Options.CanRetry(attempt) { + return q.deleteMessage(s, msg, err) + } + + // retry after specified duration + _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ + QueueUrl: q.url, + ReceiptHandle: msg.ReceiptHandle, + VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)), + }) + + return err +} + +func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error { + _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle}) + return drr +} + +// stop the queue consuming +func (q *queue) stop() { + if atomic.LoadInt32(&q.active) == 0 { + return + } + + atomic.StoreInt32(&q.active, 0) + + close(q.wait) + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() +} + +// add job to the queue +func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) { + r, err := s.SendMessage(pack(q.url, j)) + if err != nil { + return "", err + } + + return *r.MessageId, nil +} + +// return queue stats +func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) { + r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{ + QueueUrl: q.url, + AttributeNames: []*string{ + aws.String("ApproximateNumberOfMessages"), + aws.String("ApproximateNumberOfMessagesDelayed"), + aws.String("ApproximateNumberOfMessagesNotVisible"), + }, + }) + + if err != nil { + return nil, err + } + + stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")} + + for a, v := range r.Attributes { + if a == "ApproximateNumberOfMessages" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Queue = int64(v) + } + } + + if a == "ApproximateNumberOfMessagesNotVisible" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Active = int64(v) + } + } + + if a == "ApproximateNumberOfMessagesDelayed" { + if v, err := strconv.Atoi(*v); err == nil { + stat.Delayed = int64(v) + } + } + } + + return stat, nil +} + +// throw handles service, server and pool events. +func (q *queue) report(err error) { + if err != nil { + q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err}) + } +} diff --git a/plugins/jobs/oooold/broker/sqs/stat_test.go b/plugins/jobs/oooold/broker/sqs/stat_test.go new file mode 100644 index 00000000..5031571b --- /dev/null +++ b/plugins/jobs/oooold/broker/sqs/stat_test.go @@ -0,0 +1,60 @@ +package sqs + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init(cfg) + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + // unable to use approximated stats + _, err = b.Stat(pipe) + assert.NoError(t, err) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + _, err := b.Stat(pipe) + assert.NoError(t, err) + + close(waitJob) + return nil + } + + <-waitJob + _, err = b.Stat(pipe) + assert.NoError(t, err) +} |