summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/ephemeral/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/ephemeral/queue.go')
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/queue.go161
1 files changed, 0 insertions, 161 deletions
diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go
deleted file mode 100644
index a24bc216..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/queue.go
+++ /dev/null
@@ -1,161 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type queue struct {
- on int32
- state *jobs.Stat
-
- // job pipeline
- concurPool chan interface{}
- jobs chan *entry
-
- // on operations
- muw sync.Mutex
- wg sync.WaitGroup
-
- // stop channel
- wait chan interface{}
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-type entry struct {
- id string
- job *jobs.Job
- attempt int
-}
-
-// create new queue
-func newQueue(maxConcur int) *queue {
- q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)}
-
- if maxConcur != 0 {
- q.concurPool = make(chan interface{}, maxConcur)
- for i := 0; i < maxConcur; i++ {
- q.concurPool <- nil
- }
- }
-
- return q
-}
-
-// serve consumers
-func (q *queue) serve() {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.on, 1)
-
- for {
- e := q.consume()
- if e == nil {
- q.wg.Wait()
- return
- }
-
- if q.concurPool != nil {
- <-q.concurPool
- }
-
- atomic.AddInt64(&q.state.Active, 1)
- h := <-q.execPool
-
- go func(h jobs.Handler, e *entry) {
- defer q.wg.Done()
-
- q.do(h, e)
- atomic.AddInt64(&q.state.Active, ^int64(0))
-
- q.execPool <- h
-
- if q.concurPool != nil {
- q.concurPool <- nil
- }
- }(h, e)
- }
-}
-
-// allocate one job entry
-func (q *queue) consume() *entry {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil
- case e := <-q.jobs:
- q.wg.Add(1)
-
- return e
- }
-}
-
-// do singe job
-func (q *queue) do(h jobs.Handler, e *entry) {
- err := h(e.id, e.job)
-
- if err == nil {
- atomic.AddInt64(&q.state.Queue, ^int64(0))
- return
- }
-
- q.errHandler(e.id, e.job, err)
-
- if !e.job.Options.CanRetry(e.attempt) {
- atomic.AddInt64(&q.state.Queue, ^int64(0))
- return
- }
-
- q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
-}
-
-// stop the queue consuming
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.on) == 0 {
- return
- }
-
- close(q.wait)
-
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-
- atomic.StoreInt32(&q.on, 0)
-}
-
-// add job to the queue
-func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
- if delay == 0 {
- atomic.AddInt64(&q.state.Queue, 1)
- go func() {
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-
- return
- }
-
- atomic.AddInt64(&q.state.Delayed, 1)
- go func() {
- time.Sleep(delay)
- atomic.AddInt64(&q.state.Delayed, ^int64(0))
- atomic.AddInt64(&q.state.Queue, 1)
-
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-}
-
-func (q *queue) stat() *jobs.Stat {
- return &jobs.Stat{
- InternalName: ":memory:",
- Queue: atomic.LoadInt64(&q.state.Queue),
- Active: atomic.LoadInt64(&q.state.Active),
- Delayed: atomic.LoadInt64(&q.state.Delayed),
- }
-}