diff options
Diffstat (limited to 'plugins/jobs/oooold/broker/ephemeral')
-rw-r--r-- | plugins/jobs/oooold/broker/ephemeral/broker.go | 174 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/ephemeral/broker_test.go | 221 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/ephemeral/consume_test.go | 253 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/ephemeral/queue.go | 161 | ||||
-rw-r--r-- | plugins/jobs/oooold/broker/ephemeral/stat_test.go | 64 |
5 files changed, 0 insertions, 873 deletions
diff --git a/plugins/jobs/oooold/broker/ephemeral/broker.go b/plugins/jobs/oooold/broker/ephemeral/broker.go deleted file mode 100644 index 385bb175..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/broker.go +++ /dev/null @@ -1,174 +0,0 @@ -package ephemeral - -import ( - "fmt" - "github.com/gofrs/uuid" - "github.com/spiral/jobs/v2" - "sync" -) - -// Broker run queue using local goroutines. -type Broker struct { - lsn func(event int, ctx interface{}) - mu sync.Mutex - wait chan error - stopped chan interface{} - queues map[*jobs.Pipeline]*queue -} - -// Listen attaches server event watcher. -func (b *Broker) Listen(lsn func(event int, ctx interface{})) { - b.lsn = lsn -} - -// Init configures broker. -func (b *Broker) Init() (bool, error) { - b.queues = make(map[*jobs.Pipeline]*queue) - - return true, nil -} - -// Register broker pipeline. -func (b *Broker) Register(pipe *jobs.Pipeline) error { - b.mu.Lock() - defer b.mu.Unlock() - - if _, ok := b.queues[pipe]; ok { - return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) - } - - b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0)) - - return nil -} - -// Serve broker pipelines. -func (b *Broker) Serve() error { - // start consuming - b.mu.Lock() - for _, q := range b.queues { - qq := q - if qq.execPool != nil { - go qq.serve() - } - } - b.wait = make(chan error) - b.stopped = make(chan interface{}) - defer close(b.stopped) - - b.mu.Unlock() - - b.throw(jobs.EventBrokerReady, b) - - return <-b.wait -} - -// Stop all pipelines. -func (b *Broker) Stop() { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return - } - - // stop all consuming - for _, q := range b.queues { - q.stop() - } - - close(b.wait) - <-b.stopped -} - -// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before -// the service is started! -func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - q.stop() - - q.execPool = execPool - q.errHandler = errHandler - - if b.wait != nil { - if q.execPool != nil { - go q.serve() - } - } - - return nil -} - -// Push job into the worker. -func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { - if err := b.isServing(); err != nil { - return "", err - } - - q := b.queue(pipe) - if q == nil { - return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - id, err := uuid.NewV4() - if err != nil { - return "", err - } - - q.push(id.String(), j, 0, j.Options.DelayDuration()) - - return id.String(), nil -} - -// Stat must consume statistics about given pipeline or return error. -func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { - if err := b.isServing(); err != nil { - return nil, err - } - - q := b.queue(pipe) - if q == nil { - return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) - } - - return q.stat(), nil -} - -// check if broker is serving -func (b *Broker) isServing() error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.wait == nil { - return fmt.Errorf("broker is not running") - } - - return nil -} - -// queue returns queue associated with the pipeline. -func (b *Broker) queue(pipe *jobs.Pipeline) *queue { - b.mu.Lock() - defer b.mu.Unlock() - - q, ok := b.queues[pipe] - if !ok { - return nil - } - - return q -} - -// throw handles service, server and pool events. -func (b *Broker) throw(event int, ctx interface{}) { - if b.lsn != nil { - b.lsn(event, ctx) - } -} diff --git a/plugins/jobs/oooold/broker/ephemeral/broker_test.go b/plugins/jobs/oooold/broker/ephemeral/broker_test.go deleted file mode 100644 index c1b40276..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/broker_test.go +++ /dev/null @@ -1,221 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -var ( - pipe = &jobs.Pipeline{ - "broker": "local", - "name": "default", - } -) - -func TestBroker_Init(t *testing.T) { - b := &Broker{} - ok, err := b.Init() - assert.True(t, ok) - assert.NoError(t, err) -} - -func TestBroker_StopNotStarted(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - b.Stop() -} - -func TestBroker_Register(t *testing.T) { - b := &Broker{} - b.Init() - assert.NoError(t, b.Register(pipe)) -} - -func TestBroker_Register_Twice(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Register(pipe)) - assert.Error(t, b.Register(pipe)) -} - -func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - assert.NoError(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_Undefined(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - assert.Error(t, b.Consume(pipe, nil, nil)) -} - -func TestBroker_Consume_BeforeServe(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - assert.NoError(t, b.Consume(pipe, exec, errf)) -} - -func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - err = b.Consume(pipe, nil, nil) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_Consume_Serve_Stop(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - exec := make(chan jobs.Handler) - errf := func(id string, j *jobs.Job, err error) {} - - err = b.Consume(pipe, exec, errf) - if err != nil { - t.Fatal(err) - } - - wait := make(chan interface{}) - go func() { - assert.NoError(t, b.Serve()) - close(wait) - }() - time.Sleep(time.Millisecond * 100) - b.Stop() - - <-wait -} - -func TestBroker_PushToNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRunning(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - _, err = b.Stat(pipe) - assert.Error(t, err) -} - -func TestBroker_PushToNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Push(pipe, &jobs.Job{}) - assert.Error(t, err) -} - -func TestBroker_StatNotRegistered(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - _, err = b.Stat(pipe) - assert.Error(t, err) -} diff --git a/plugins/jobs/oooold/broker/ephemeral/consume_test.go b/plugins/jobs/oooold/broker/ephemeral/consume_test.go deleted file mode 100644 index d764a984..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/consume_test.go +++ /dev/null @@ -1,253 +0,0 @@ -package ephemeral - -import ( - "fmt" - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestBroker_Consume_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_ConsumeAfterStart_Job(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob -} - -func TestBroker_Consume_Delayed(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Delay: 1}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - start := time.Now() - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return nil - } - - <-waitJob - - elapsed := time.Since(start) - assert.True(t, elapsed > time.Second) - assert.True(t, elapsed < 2*time.Second) -} - -func TestBroker_Consume_Errored(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - close(errHandled) - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - close(waitJob) - return fmt.Errorf("job failed") - } - - <-waitJob - <-errHandled -} - -func TestBroker_Consume_Errored_Attempts(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - attempts := 0 - errHandled := make(chan interface{}) - errHandler := func(id string, j *jobs.Job, err error) { - assert.Equal(t, "job failed", err.Error()) - attempts++ - errHandled <- nil - } - - exec := make(chan jobs.Handler, 1) - - assert.NoError(t, b.Consume(pipe, exec, errHandler)) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{ - Job: "test", - Payload: "body", - Options: &jobs.Options{Attempts: 3}, - }) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - return fmt.Errorf("job failed") - } - - <-errHandled - <-errHandled - <-errHandled - assert.Equal(t, 3, attempts) -} diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go deleted file mode 100644 index a24bc216..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/queue.go +++ /dev/null @@ -1,161 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "sync" - "sync/atomic" - "time" -) - -type queue struct { - on int32 - state *jobs.Stat - - // job pipeline - concurPool chan interface{} - jobs chan *entry - - // on operations - muw sync.Mutex - wg sync.WaitGroup - - // stop channel - wait chan interface{} - - // exec handlers - execPool chan jobs.Handler - errHandler jobs.ErrorHandler -} - -type entry struct { - id string - job *jobs.Job - attempt int -} - -// create new queue -func newQueue(maxConcur int) *queue { - q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)} - - if maxConcur != 0 { - q.concurPool = make(chan interface{}, maxConcur) - for i := 0; i < maxConcur; i++ { - q.concurPool <- nil - } - } - - return q -} - -// serve consumers -func (q *queue) serve() { - q.wait = make(chan interface{}) - atomic.StoreInt32(&q.on, 1) - - for { - e := q.consume() - if e == nil { - q.wg.Wait() - return - } - - if q.concurPool != nil { - <-q.concurPool - } - - atomic.AddInt64(&q.state.Active, 1) - h := <-q.execPool - - go func(h jobs.Handler, e *entry) { - defer q.wg.Done() - - q.do(h, e) - atomic.AddInt64(&q.state.Active, ^int64(0)) - - q.execPool <- h - - if q.concurPool != nil { - q.concurPool <- nil - } - }(h, e) - } -} - -// allocate one job entry -func (q *queue) consume() *entry { - q.muw.Lock() - defer q.muw.Unlock() - - select { - case <-q.wait: - return nil - case e := <-q.jobs: - q.wg.Add(1) - - return e - } -} - -// do singe job -func (q *queue) do(h jobs.Handler, e *entry) { - err := h(e.id, e.job) - - if err == nil { - atomic.AddInt64(&q.state.Queue, ^int64(0)) - return - } - - q.errHandler(e.id, e.job, err) - - if !e.job.Options.CanRetry(e.attempt) { - atomic.AddInt64(&q.state.Queue, ^int64(0)) - return - } - - q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) -} - -// stop the queue consuming -func (q *queue) stop() { - if atomic.LoadInt32(&q.on) == 0 { - return - } - - close(q.wait) - - q.muw.Lock() - q.wg.Wait() - q.muw.Unlock() - - atomic.StoreInt32(&q.on, 0) -} - -// add job to the queue -func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) { - if delay == 0 { - atomic.AddInt64(&q.state.Queue, 1) - go func() { - q.jobs <- &entry{id: id, job: j, attempt: attempt} - }() - - return - } - - atomic.AddInt64(&q.state.Delayed, 1) - go func() { - time.Sleep(delay) - atomic.AddInt64(&q.state.Delayed, ^int64(0)) - atomic.AddInt64(&q.state.Queue, 1) - - q.jobs <- &entry{id: id, job: j, attempt: attempt} - }() -} - -func (q *queue) stat() *jobs.Stat { - return &jobs.Stat{ - InternalName: ":memory:", - Queue: atomic.LoadInt64(&q.state.Queue), - Active: atomic.LoadInt64(&q.state.Active), - Delayed: atomic.LoadInt64(&q.state.Delayed), - } -} diff --git a/plugins/jobs/oooold/broker/ephemeral/stat_test.go b/plugins/jobs/oooold/broker/ephemeral/stat_test.go deleted file mode 100644 index 0894323c..00000000 --- a/plugins/jobs/oooold/broker/ephemeral/stat_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/jobs/v2" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestBroker_Stat(t *testing.T) { - b := &Broker{} - _, err := b.Init() - if err != nil { - t.Fatal(err) - } - err = b.Register(pipe) - if err != nil { - t.Fatal(err) - } - - ready := make(chan interface{}) - b.Listen(func(event int, ctx interface{}) { - if event == jobs.EventBrokerReady { - close(ready) - } - }) - - exec := make(chan jobs.Handler, 1) - - go func() { assert.NoError(t, b.Serve()) }() - defer b.Stop() - - <-ready - - jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) - - assert.NotEqual(t, "", jid) - assert.NoError(t, perr) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Queue) - assert.Equal(t, int64(0), stat.Active) - - assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) - - waitJob := make(chan interface{}) - exec <- func(id string, j *jobs.Job) error { - assert.Equal(t, jid, id) - assert.Equal(t, "body", j.Payload) - - stat, err := b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(1), stat.Active) - - close(waitJob) - return nil - } - - <-waitJob - stat, err = b.Stat(pipe) - assert.NoError(t, err) - assert.Equal(t, int64(0), stat.Queue) - assert.Equal(t, int64(0), stat.Active) -} |