diff options
Diffstat (limited to 'plugins/jobs/oooold/broker/ephemeral/queue.go')
-rw-r--r-- | plugins/jobs/oooold/broker/ephemeral/queue.go | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go new file mode 100644 index 00000000..a24bc216 --- /dev/null +++ b/plugins/jobs/oooold/broker/ephemeral/queue.go @@ -0,0 +1,161 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "sync" + "sync/atomic" + "time" +) + +type queue struct { + on int32 + state *jobs.Stat + + // job pipeline + concurPool chan interface{} + jobs chan *entry + + // on operations + muw sync.Mutex + wg sync.WaitGroup + + // stop channel + wait chan interface{} + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +type entry struct { + id string + job *jobs.Job + attempt int +} + +// create new queue +func newQueue(maxConcur int) *queue { + q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)} + + if maxConcur != 0 { + q.concurPool = make(chan interface{}, maxConcur) + for i := 0; i < maxConcur; i++ { + q.concurPool <- nil + } + } + + return q +} + +// serve consumers +func (q *queue) serve() { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.on, 1) + + for { + e := q.consume() + if e == nil { + q.wg.Wait() + return + } + + if q.concurPool != nil { + <-q.concurPool + } + + atomic.AddInt64(&q.state.Active, 1) + h := <-q.execPool + + go func(h jobs.Handler, e *entry) { + defer q.wg.Done() + + q.do(h, e) + atomic.AddInt64(&q.state.Active, ^int64(0)) + + q.execPool <- h + + if q.concurPool != nil { + q.concurPool <- nil + } + }(h, e) + } +} + +// allocate one job entry +func (q *queue) consume() *entry { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil + case e := <-q.jobs: + q.wg.Add(1) + + return e + } +} + +// do singe job +func (q *queue) do(h jobs.Handler, e *entry) { + err := h(e.id, e.job) + + if err == nil { + atomic.AddInt64(&q.state.Queue, ^int64(0)) + return + } + + q.errHandler(e.id, e.job, err) + + if !e.job.Options.CanRetry(e.attempt) { + atomic.AddInt64(&q.state.Queue, ^int64(0)) + return + } + + q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) +} + +// stop the queue consuming +func (q *queue) stop() { + if atomic.LoadInt32(&q.on) == 0 { + return + } + + close(q.wait) + + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() + + atomic.StoreInt32(&q.on, 0) +} + +// add job to the queue +func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) { + if delay == 0 { + atomic.AddInt64(&q.state.Queue, 1) + go func() { + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() + + return + } + + atomic.AddInt64(&q.state.Delayed, 1) + go func() { + time.Sleep(delay) + atomic.AddInt64(&q.state.Delayed, ^int64(0)) + atomic.AddInt64(&q.state.Queue, 1) + + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() +} + +func (q *queue) stat() *jobs.Stat { + return &jobs.Stat{ + InternalName: ":memory:", + Queue: atomic.LoadInt64(&q.state.Queue), + Active: atomic.LoadInt64(&q.state.Active), + Delayed: atomic.LoadInt64(&q.state.Delayed), + } +} |