summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker_test.go')
-rw-r--r--plugins/jobs/oooold/broker_test.go314
1 files changed, 0 insertions, 314 deletions
diff --git a/plugins/jobs/oooold/broker_test.go b/plugins/jobs/oooold/broker_test.go
deleted file mode 100644
index b93eac51..00000000
--- a/plugins/jobs/oooold/broker_test.go
+++ /dev/null
@@ -1,314 +0,0 @@
-package oooold
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "sync"
- "sync/atomic"
- "time"
-)
-
-// testBroker run testQueue using local goroutines.
-type testBroker struct {
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*Pipeline]*testQueue
-}
-
-// Listen attaches server event watcher.
-func (b *testBroker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures broker.
-func (b *testBroker) Init() (bool, error) {
- b.queues = make(map[*Pipeline]*testQueue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *testBroker) Register(pipe *Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("testQueue `%s` has already been registered", pipe.Name())
- }
-
- b.queues[pipe] = newQueue()
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *testBroker) Serve() error {
- // start pipelines
- b.mu.Lock()
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve()
- }
- }
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *testBroker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- // stop all pipelines
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
-// the service is started!
-func (b *testBroker) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined testQueue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.wait != nil {
- if q.execPool != nil {
- go q.serve()
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *testBroker) Push(pipe *Pipeline, j *Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined testQueue `%s`", pipe.Name())
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q.push(id.String(), j, 0, j.Options.DelayDuration())
-
- return id.String(), nil
-}
-
-// Stat must consume statistics about given pipeline or return error.
-func (b *testBroker) Stat(pipe *Pipeline) (stat *Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined testQueue `%s`", pipe.Name())
- }
-
- return q.stat(), nil
-}
-
-// check if broker is serving
-func (b *testBroker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// testQueue returns testQueue associated with the pipeline.
-func (b *testBroker) queue(pipe *Pipeline) *testQueue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *testBroker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
-
-type testQueue struct {
- active int32
- st *Stat
-
- // job pipeline
- jobs chan *entry
-
- // pipelines operations
- muw sync.Mutex
- wg sync.WaitGroup
-
- // stop channel
- wait chan interface{}
-
- // exec handlers
- execPool chan Handler
- errHandler ErrorHandler
-}
-
-type entry struct {
- id string
- job *Job
- attempt int
-}
-
-// create new testQueue
-func newQueue() *testQueue {
- return &testQueue{st: &Stat{}, jobs: make(chan *entry)}
-}
-
-// todo NOT USED
-// associate testQueue with new do pool
-//func (q *testQueue) configure(execPool chan Handler, err ErrorHandler) error {
-// q.execPool = execPool
-// q.errHandler = err
-//
-// return nil
-//}
-
-// serve consumers
-func (q *testQueue) serve() {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.active, 1)
-
- for {
- e := q.consume()
- if e == nil {
- return
- }
-
- atomic.AddInt64(&q.st.Active, 1)
- h := <-q.execPool
- go func(e *entry) {
- q.do(h, e)
- atomic.AddInt64(&q.st.Active, ^int64(0))
- q.execPool <- h
- q.wg.Done()
- }(e)
- }
-}
-
-// allocate one job entry
-func (q *testQueue) consume() *entry {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil
- case e := <-q.jobs:
- q.wg.Add(1)
-
- return e
- }
-}
-
-// do singe job
-func (q *testQueue) do(h Handler, e *entry) {
- err := h(e.id, e.job)
-
- if err == nil {
- atomic.AddInt64(&q.st.Queue, ^int64(0))
- return
- }
-
- q.errHandler(e.id, e.job, err)
-
- if !e.job.Options.CanRetry(e.attempt) {
- atomic.AddInt64(&q.st.Queue, ^int64(0))
- return
- }
-
- q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
-}
-
-// stop the testQueue pipelines
-func (q *testQueue) stop() {
- if atomic.LoadInt32(&q.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&q.active, 0)
-
- close(q.wait)
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-}
-
-// add job to the testQueue
-func (q *testQueue) push(id string, j *Job, attempt int, delay time.Duration) {
- if delay == 0 {
- atomic.AddInt64(&q.st.Queue, 1)
- go func() {
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-
- return
- }
-
- atomic.AddInt64(&q.st.Delayed, 1)
- go func() {
- time.Sleep(delay)
- atomic.AddInt64(&q.st.Delayed, ^int64(0))
- atomic.AddInt64(&q.st.Queue, 1)
-
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-}
-
-func (q *testQueue) stat() *Stat {
- return &Stat{
- InternalName: ":memory:",
- Queue: atomic.LoadInt64(&q.st.Queue),
- Active: atomic.LoadInt64(&q.st.Active),
- Delayed: atomic.LoadInt64(&q.st.Delayed),
- }
-}