summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/ephemeral
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/ephemeral')
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/broker.go174
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/broker_test.go221
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/consume_test.go253
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/queue.go161
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/stat_test.go64
5 files changed, 0 insertions, 873 deletions
diff --git a/plugins/jobs/oooold/broker/ephemeral/broker.go b/plugins/jobs/oooold/broker/ephemeral/broker.go
deleted file mode 100644
index 385bb175..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/broker.go
+++ /dev/null
@@ -1,174 +0,0 @@
-package ephemeral
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker run queue using local goroutines.
-type Broker struct {
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures broker.
-func (b *Broker) Init() (bool, error) {
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0))
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() error {
- // start consuming
- b.mu.Lock()
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve()
- }
- }
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- // stop all consuming
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.wait != nil {
- if q.execPool != nil {
- go q.serve()
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q.push(id.String(), j, 0, j.Options.DelayDuration())
-
- return id.String(), nil
-}
-
-// Stat must consume statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- return q.stat(), nil
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/broker_test.go b/plugins/jobs/oooold/broker/ephemeral/broker_test.go
deleted file mode 100644
index c1b40276..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/broker_test.go
+++ /dev/null
@@ -1,221 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "local",
- "name": "default",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init()
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- b.Init()
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/consume_test.go b/plugins/jobs/oooold/broker/ephemeral/consume_test.go
deleted file mode 100644
index d764a984..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/consume_test.go
+++ /dev/null
@@ -1,253 +0,0 @@
-package ephemeral
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed > time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go
deleted file mode 100644
index a24bc216..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/queue.go
+++ /dev/null
@@ -1,161 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type queue struct {
- on int32
- state *jobs.Stat
-
- // job pipeline
- concurPool chan interface{}
- jobs chan *entry
-
- // on operations
- muw sync.Mutex
- wg sync.WaitGroup
-
- // stop channel
- wait chan interface{}
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-type entry struct {
- id string
- job *jobs.Job
- attempt int
-}
-
-// create new queue
-func newQueue(maxConcur int) *queue {
- q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)}
-
- if maxConcur != 0 {
- q.concurPool = make(chan interface{}, maxConcur)
- for i := 0; i < maxConcur; i++ {
- q.concurPool <- nil
- }
- }
-
- return q
-}
-
-// serve consumers
-func (q *queue) serve() {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.on, 1)
-
- for {
- e := q.consume()
- if e == nil {
- q.wg.Wait()
- return
- }
-
- if q.concurPool != nil {
- <-q.concurPool
- }
-
- atomic.AddInt64(&q.state.Active, 1)
- h := <-q.execPool
-
- go func(h jobs.Handler, e *entry) {
- defer q.wg.Done()
-
- q.do(h, e)
- atomic.AddInt64(&q.state.Active, ^int64(0))
-
- q.execPool <- h
-
- if q.concurPool != nil {
- q.concurPool <- nil
- }
- }(h, e)
- }
-}
-
-// allocate one job entry
-func (q *queue) consume() *entry {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil
- case e := <-q.jobs:
- q.wg.Add(1)
-
- return e
- }
-}
-
-// do singe job
-func (q *queue) do(h jobs.Handler, e *entry) {
- err := h(e.id, e.job)
-
- if err == nil {
- atomic.AddInt64(&q.state.Queue, ^int64(0))
- return
- }
-
- q.errHandler(e.id, e.job, err)
-
- if !e.job.Options.CanRetry(e.attempt) {
- atomic.AddInt64(&q.state.Queue, ^int64(0))
- return
- }
-
- q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
-}
-
-// stop the queue consuming
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.on) == 0 {
- return
- }
-
- close(q.wait)
-
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-
- atomic.StoreInt32(&q.on, 0)
-}
-
-// add job to the queue
-func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
- if delay == 0 {
- atomic.AddInt64(&q.state.Queue, 1)
- go func() {
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-
- return
- }
-
- atomic.AddInt64(&q.state.Delayed, 1)
- go func() {
- time.Sleep(delay)
- atomic.AddInt64(&q.state.Delayed, ^int64(0))
- atomic.AddInt64(&q.state.Queue, 1)
-
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-}
-
-func (q *queue) stat() *jobs.Stat {
- return &jobs.Stat{
- InternalName: ":memory:",
- Queue: atomic.LoadInt64(&q.state.Queue),
- Active: atomic.LoadInt64(&q.state.Active),
- Delayed: atomic.LoadInt64(&q.state.Delayed),
- }
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/stat_test.go b/plugins/jobs/oooold/broker/ephemeral/stat_test.go
deleted file mode 100644
index 0894323c..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/stat_test.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Active)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-}