summaryrefslogtreecommitdiff
path: root/plugins/jobs/broker/ephemeral
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-15 22:12:32 +0300
committerValery Piashchynski <[email protected]>2021-06-15 22:12:32 +0300
commitd4c92e48bada7593b6fbec612a742c599de6e736 (patch)
tree53b6fb81987953b71a77ae094e579a0a7daa407c /plugins/jobs/broker/ephemeral
parent9dc98d43b0c0de3e1e1bd8fdc97c122c7c7c594f (diff)
- Jobs plugin initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/broker/ephemeral')
-rw-r--r--plugins/jobs/broker/ephemeral/broker.go174
-rw-r--r--plugins/jobs/broker/ephemeral/broker_test.go221
-rw-r--r--plugins/jobs/broker/ephemeral/consume_test.go253
-rw-r--r--plugins/jobs/broker/ephemeral/queue.go161
-rw-r--r--plugins/jobs/broker/ephemeral/stat_test.go64
5 files changed, 873 insertions, 0 deletions
diff --git a/plugins/jobs/broker/ephemeral/broker.go b/plugins/jobs/broker/ephemeral/broker.go
new file mode 100644
index 00000000..385bb175
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/broker.go
@@ -0,0 +1,174 @@
+package ephemeral
+
+import (
+ "fmt"
+ "github.com/gofrs/uuid"
+ "github.com/spiral/jobs/v2"
+ "sync"
+)
+
+// Broker run queue using local goroutines.
+type Broker struct {
+ lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ wait chan error
+ stopped chan interface{}
+ queues map[*jobs.Pipeline]*queue
+}
+
+// Listen attaches server event watcher.
+func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
+ b.lsn = lsn
+}
+
+// Init configures broker.
+func (b *Broker) Init() (bool, error) {
+ b.queues = make(map[*jobs.Pipeline]*queue)
+
+ return true, nil
+}
+
+// Register broker pipeline.
+func (b *Broker) Register(pipe *jobs.Pipeline) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if _, ok := b.queues[pipe]; ok {
+ return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
+ }
+
+ b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0))
+
+ return nil
+}
+
+// Serve broker pipelines.
+func (b *Broker) Serve() error {
+ // start consuming
+ b.mu.Lock()
+ for _, q := range b.queues {
+ qq := q
+ if qq.execPool != nil {
+ go qq.serve()
+ }
+ }
+ b.wait = make(chan error)
+ b.stopped = make(chan interface{})
+ defer close(b.stopped)
+
+ b.mu.Unlock()
+
+ b.throw(jobs.EventBrokerReady, b)
+
+ return <-b.wait
+}
+
+// Stop all pipelines.
+func (b *Broker) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return
+ }
+
+ // stop all consuming
+ for _, q := range b.queues {
+ q.stop()
+ }
+
+ close(b.wait)
+ <-b.stopped
+}
+
+// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
+// the service is started!
+func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ q.stop()
+
+ q.execPool = execPool
+ q.errHandler = errHandler
+
+ if b.wait != nil {
+ if q.execPool != nil {
+ go q.serve()
+ }
+ }
+
+ return nil
+}
+
+// Push job into the worker.
+func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
+ if err := b.isServing(); err != nil {
+ return "", err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ id, err := uuid.NewV4()
+ if err != nil {
+ return "", err
+ }
+
+ q.push(id.String(), j, 0, j.Options.DelayDuration())
+
+ return id.String(), nil
+}
+
+// Stat must consume statistics about given pipeline or return error.
+func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
+ if err := b.isServing(); err != nil {
+ return nil, err
+ }
+
+ q := b.queue(pipe)
+ if q == nil {
+ return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
+ }
+
+ return q.stat(), nil
+}
+
+// check if broker is serving
+func (b *Broker) isServing() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if b.wait == nil {
+ return fmt.Errorf("broker is not running")
+ }
+
+ return nil
+}
+
+// queue returns queue associated with the pipeline.
+func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ q, ok := b.queues[pipe]
+ if !ok {
+ return nil
+ }
+
+ return q
+}
+
+// throw handles service, server and pool events.
+func (b *Broker) throw(event int, ctx interface{}) {
+ if b.lsn != nil {
+ b.lsn(event, ctx)
+ }
+}
diff --git a/plugins/jobs/broker/ephemeral/broker_test.go b/plugins/jobs/broker/ephemeral/broker_test.go
new file mode 100644
index 00000000..c1b40276
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/broker_test.go
@@ -0,0 +1,221 @@
+package ephemeral
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+var (
+ pipe = &jobs.Pipeline{
+ "broker": "local",
+ "name": "default",
+ }
+)
+
+func TestBroker_Init(t *testing.T) {
+ b := &Broker{}
+ ok, err := b.Init()
+ assert.True(t, ok)
+ assert.NoError(t, err)
+}
+
+func TestBroker_StopNotStarted(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ b.Stop()
+}
+
+func TestBroker_Register(t *testing.T) {
+ b := &Broker{}
+ b.Init()
+ assert.NoError(t, b.Register(pipe))
+}
+
+func TestBroker_Register_Twice(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Register(pipe))
+ assert.Error(t, b.Register(pipe))
+}
+
+func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NoError(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_Undefined(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Error(t, b.Consume(pipe, nil, nil))
+}
+
+func TestBroker_Consume_BeforeServe(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ assert.NoError(t, b.Consume(pipe, exec, errf))
+}
+
+func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = b.Consume(pipe, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_Consume_Serve_Stop(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ exec := make(chan jobs.Handler)
+ errf := func(id string, j *jobs.Job, err error) {}
+
+ err = b.Consume(pipe, exec, errf)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wait := make(chan interface{})
+ go func() {
+ assert.NoError(t, b.Serve())
+ close(wait)
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b.Stop()
+
+ <-wait
+}
+
+func TestBroker_PushToNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRunning(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
+
+func TestBroker_PushToNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Push(pipe, &jobs.Job{})
+ assert.Error(t, err)
+}
+
+func TestBroker_StatNotRegistered(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ _, err = b.Stat(pipe)
+ assert.Error(t, err)
+}
diff --git a/plugins/jobs/broker/ephemeral/consume_test.go b/plugins/jobs/broker/ephemeral/consume_test.go
new file mode 100644
index 00000000..d764a984
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/consume_test.go
@@ -0,0 +1,253 @@
+package ephemeral
+
+import (
+ "fmt"
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestBroker_Consume_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+}
+
+func TestBroker_Consume_Delayed(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Delay: 1},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ start := time.Now()
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+
+ elapsed := time.Since(start)
+ assert.True(t, elapsed > time.Second)
+ assert.True(t, elapsed < 2*time.Second)
+}
+
+func TestBroker_Consume_Errored(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ close(errHandled)
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ close(waitJob)
+ return fmt.Errorf("job failed")
+ }
+
+ <-waitJob
+ <-errHandled
+}
+
+func TestBroker_Consume_Errored_Attempts(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ attempts := 0
+ errHandled := make(chan interface{})
+ errHandler := func(id string, j *jobs.Job, err error) {
+ assert.Equal(t, "job failed", err.Error())
+ attempts++
+ errHandled <- nil
+ }
+
+ exec := make(chan jobs.Handler, 1)
+
+ assert.NoError(t, b.Consume(pipe, exec, errHandler))
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{
+ Job: "test",
+ Payload: "body",
+ Options: &jobs.Options{Attempts: 3},
+ })
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+ return fmt.Errorf("job failed")
+ }
+
+ <-errHandled
+ <-errHandled
+ <-errHandled
+ assert.Equal(t, 3, attempts)
+}
diff --git a/plugins/jobs/broker/ephemeral/queue.go b/plugins/jobs/broker/ephemeral/queue.go
new file mode 100644
index 00000000..a24bc216
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/queue.go
@@ -0,0 +1,161 @@
+package ephemeral
+
+import (
+ "github.com/spiral/jobs/v2"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type queue struct {
+ on int32
+ state *jobs.Stat
+
+ // job pipeline
+ concurPool chan interface{}
+ jobs chan *entry
+
+ // on operations
+ muw sync.Mutex
+ wg sync.WaitGroup
+
+ // stop channel
+ wait chan interface{}
+
+ // exec handlers
+ execPool chan jobs.Handler
+ errHandler jobs.ErrorHandler
+}
+
+type entry struct {
+ id string
+ job *jobs.Job
+ attempt int
+}
+
+// create new queue
+func newQueue(maxConcur int) *queue {
+ q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)}
+
+ if maxConcur != 0 {
+ q.concurPool = make(chan interface{}, maxConcur)
+ for i := 0; i < maxConcur; i++ {
+ q.concurPool <- nil
+ }
+ }
+
+ return q
+}
+
+// serve consumers
+func (q *queue) serve() {
+ q.wait = make(chan interface{})
+ atomic.StoreInt32(&q.on, 1)
+
+ for {
+ e := q.consume()
+ if e == nil {
+ q.wg.Wait()
+ return
+ }
+
+ if q.concurPool != nil {
+ <-q.concurPool
+ }
+
+ atomic.AddInt64(&q.state.Active, 1)
+ h := <-q.execPool
+
+ go func(h jobs.Handler, e *entry) {
+ defer q.wg.Done()
+
+ q.do(h, e)
+ atomic.AddInt64(&q.state.Active, ^int64(0))
+
+ q.execPool <- h
+
+ if q.concurPool != nil {
+ q.concurPool <- nil
+ }
+ }(h, e)
+ }
+}
+
+// allocate one job entry
+func (q *queue) consume() *entry {
+ q.muw.Lock()
+ defer q.muw.Unlock()
+
+ select {
+ case <-q.wait:
+ return nil
+ case e := <-q.jobs:
+ q.wg.Add(1)
+
+ return e
+ }
+}
+
+// do singe job
+func (q *queue) do(h jobs.Handler, e *entry) {
+ err := h(e.id, e.job)
+
+ if err == nil {
+ atomic.AddInt64(&q.state.Queue, ^int64(0))
+ return
+ }
+
+ q.errHandler(e.id, e.job, err)
+
+ if !e.job.Options.CanRetry(e.attempt) {
+ atomic.AddInt64(&q.state.Queue, ^int64(0))
+ return
+ }
+
+ q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
+}
+
+// stop the queue consuming
+func (q *queue) stop() {
+ if atomic.LoadInt32(&q.on) == 0 {
+ return
+ }
+
+ close(q.wait)
+
+ q.muw.Lock()
+ q.wg.Wait()
+ q.muw.Unlock()
+
+ atomic.StoreInt32(&q.on, 0)
+}
+
+// add job to the queue
+func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
+ if delay == 0 {
+ atomic.AddInt64(&q.state.Queue, 1)
+ go func() {
+ q.jobs <- &entry{id: id, job: j, attempt: attempt}
+ }()
+
+ return
+ }
+
+ atomic.AddInt64(&q.state.Delayed, 1)
+ go func() {
+ time.Sleep(delay)
+ atomic.AddInt64(&q.state.Delayed, ^int64(0))
+ atomic.AddInt64(&q.state.Queue, 1)
+
+ q.jobs <- &entry{id: id, job: j, attempt: attempt}
+ }()
+}
+
+func (q *queue) stat() *jobs.Stat {
+ return &jobs.Stat{
+ InternalName: ":memory:",
+ Queue: atomic.LoadInt64(&q.state.Queue),
+ Active: atomic.LoadInt64(&q.state.Active),
+ Delayed: atomic.LoadInt64(&q.state.Delayed),
+ }
+}
diff --git a/plugins/jobs/broker/ephemeral/stat_test.go b/plugins/jobs/broker/ephemeral/stat_test.go
new file mode 100644
index 00000000..0894323c
--- /dev/null
+++ b/plugins/jobs/broker/ephemeral/stat_test.go
@@ -0,0 +1,64 @@
+package ephemeral
+
+import (
+ "github.com/spiral/jobs/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestBroker_Stat(t *testing.T) {
+ b := &Broker{}
+ _, err := b.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = b.Register(pipe)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ready := make(chan interface{})
+ b.Listen(func(event int, ctx interface{}) {
+ if event == jobs.EventBrokerReady {
+ close(ready)
+ }
+ })
+
+ exec := make(chan jobs.Handler, 1)
+
+ go func() { assert.NoError(t, b.Serve()) }()
+ defer b.Stop()
+
+ <-ready
+
+ jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
+
+ assert.NotEqual(t, "", jid)
+ assert.NoError(t, perr)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+
+ assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
+
+ waitJob := make(chan interface{})
+ exec <- func(id string, j *jobs.Job) error {
+ assert.Equal(t, jid, id)
+ assert.Equal(t, "body", j.Payload)
+
+ stat, err := b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), stat.Active)
+
+ close(waitJob)
+ return nil
+ }
+
+ <-waitJob
+ stat, err = b.Stat(pipe)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), stat.Queue)
+ assert.Equal(t, int64(0), stat.Active)
+}