diff options
author | Valery Piashchynski <[email protected]> | 2021-06-15 22:12:32 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-15 22:12:32 +0300 |
commit | d4c92e48bada7593b6fbec612a742c599de6e736 (patch) | |
tree | 53b6fb81987953b71a77ae094e579a0a7daa407c /plugins/jobs/broker/ephemeral | |
parent | 9dc98d43b0c0de3e1e1bd8fdc97c122c7c7c594f (diff) |
- Jobs plugin initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/broker/ephemeral')
-rw-r--r-- | plugins/jobs/broker/ephemeral/broker.go | 174 | ||||
-rw-r--r-- | plugins/jobs/broker/ephemeral/broker_test.go | 221 | ||||
-rw-r--r-- | plugins/jobs/broker/ephemeral/consume_test.go | 253 | ||||
-rw-r--r-- | plugins/jobs/broker/ephemeral/queue.go | 161 | ||||
-rw-r--r-- | plugins/jobs/broker/ephemeral/stat_test.go | 64 |
5 files changed, 873 insertions, 0 deletions
diff --git a/plugins/jobs/broker/ephemeral/broker.go b/plugins/jobs/broker/ephemeral/broker.go new file mode 100644 index 00000000..385bb175 --- /dev/null +++ b/plugins/jobs/broker/ephemeral/broker.go @@ -0,0 +1,174 @@ +package ephemeral + +import ( + "fmt" + "github.com/gofrs/uuid" + "github.com/spiral/jobs/v2" + "sync" +) + +// Broker run queue using local goroutines. +type Broker struct { + lsn func(event int, ctx interface{}) + mu sync.Mutex + wait chan error + stopped chan interface{} + queues map[*jobs.Pipeline]*queue +} + +// Listen attaches server event watcher. +func (b *Broker) Listen(lsn func(event int, ctx interface{})) { + b.lsn = lsn +} + +// Init configures broker. +func (b *Broker) Init() (bool, error) { + b.queues = make(map[*jobs.Pipeline]*queue) + + return true, nil +} + +// Register broker pipeline. +func (b *Broker) Register(pipe *jobs.Pipeline) error { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.queues[pipe]; ok { + return fmt.Errorf("queue `%s` has already been registered", pipe.Name()) + } + + b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0)) + + return nil +} + +// Serve broker pipelines. +func (b *Broker) Serve() error { + // start consuming + b.mu.Lock() + for _, q := range b.queues { + qq := q + if qq.execPool != nil { + go qq.serve() + } + } + b.wait = make(chan error) + b.stopped = make(chan interface{}) + defer close(b.stopped) + + b.mu.Unlock() + + b.throw(jobs.EventBrokerReady, b) + + return <-b.wait +} + +// Stop all pipelines. +func (b *Broker) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return + } + + // stop all consuming + for _, q := range b.queues { + q.stop() + } + + close(b.wait) + <-b.stopped +} + +// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before +// the service is started! +func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + q.stop() + + q.execPool = execPool + q.errHandler = errHandler + + if b.wait != nil { + if q.execPool != nil { + go q.serve() + } + } + + return nil +} + +// Push job into the worker. +func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) { + if err := b.isServing(); err != nil { + return "", err + } + + q := b.queue(pipe) + if q == nil { + return "", fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + id, err := uuid.NewV4() + if err != nil { + return "", err + } + + q.push(id.String(), j, 0, j.Options.DelayDuration()) + + return id.String(), nil +} + +// Stat must consume statistics about given pipeline or return error. +func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) { + if err := b.isServing(); err != nil { + return nil, err + } + + q := b.queue(pipe) + if q == nil { + return nil, fmt.Errorf("undefined queue `%s`", pipe.Name()) + } + + return q.stat(), nil +} + +// check if broker is serving +func (b *Broker) isServing() error { + b.mu.Lock() + defer b.mu.Unlock() + + if b.wait == nil { + return fmt.Errorf("broker is not running") + } + + return nil +} + +// queue returns queue associated with the pipeline. +func (b *Broker) queue(pipe *jobs.Pipeline) *queue { + b.mu.Lock() + defer b.mu.Unlock() + + q, ok := b.queues[pipe] + if !ok { + return nil + } + + return q +} + +// throw handles service, server and pool events. +func (b *Broker) throw(event int, ctx interface{}) { + if b.lsn != nil { + b.lsn(event, ctx) + } +} diff --git a/plugins/jobs/broker/ephemeral/broker_test.go b/plugins/jobs/broker/ephemeral/broker_test.go new file mode 100644 index 00000000..c1b40276 --- /dev/null +++ b/plugins/jobs/broker/ephemeral/broker_test.go @@ -0,0 +1,221 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +var ( + pipe = &jobs.Pipeline{ + "broker": "local", + "name": "default", + } +) + +func TestBroker_Init(t *testing.T) { + b := &Broker{} + ok, err := b.Init() + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestBroker_StopNotStarted(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + b.Stop() +} + +func TestBroker_Register(t *testing.T) { + b := &Broker{} + b.Init() + assert.NoError(t, b.Register(pipe)) +} + +func TestBroker_Register_Twice(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Register(pipe)) + assert.Error(t, b.Register(pipe)) +} + +func TestBroker_Consume_Nil_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + assert.NoError(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_Undefined(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + assert.Error(t, b.Consume(pipe, nil, nil)) +} + +func TestBroker_Consume_BeforeServe(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + assert.NoError(t, b.Consume(pipe, exec, errf)) +} + +func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + err = b.Consume(pipe, nil, nil) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_Consume_Serve_Stop(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + exec := make(chan jobs.Handler) + errf := func(id string, j *jobs.Job, err error) {} + + err = b.Consume(pipe, exec, errf) + if err != nil { + t.Fatal(err) + } + + wait := make(chan interface{}) + go func() { + assert.NoError(t, b.Serve()) + close(wait) + }() + time.Sleep(time.Millisecond * 100) + b.Stop() + + <-wait +} + +func TestBroker_PushToNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRunning(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + _, err = b.Stat(pipe) + assert.Error(t, err) +} + +func TestBroker_PushToNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Push(pipe, &jobs.Job{}) + assert.Error(t, err) +} + +func TestBroker_StatNotRegistered(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + _, err = b.Stat(pipe) + assert.Error(t, err) +} diff --git a/plugins/jobs/broker/ephemeral/consume_test.go b/plugins/jobs/broker/ephemeral/consume_test.go new file mode 100644 index 00000000..d764a984 --- /dev/null +++ b/plugins/jobs/broker/ephemeral/consume_test.go @@ -0,0 +1,253 @@ +package ephemeral + +import ( + "fmt" + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestBroker_Consume_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_ConsumeAfterStart_Job(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob +} + +func TestBroker_Consume_Delayed(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Delay: 1}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + start := time.Now() + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return nil + } + + <-waitJob + + elapsed := time.Since(start) + assert.True(t, elapsed > time.Second) + assert.True(t, elapsed < 2*time.Second) +} + +func TestBroker_Consume_Errored(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + close(errHandled) + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + close(waitJob) + return fmt.Errorf("job failed") + } + + <-waitJob + <-errHandled +} + +func TestBroker_Consume_Errored_Attempts(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + attempts := 0 + errHandled := make(chan interface{}) + errHandler := func(id string, j *jobs.Job, err error) { + assert.Equal(t, "job failed", err.Error()) + attempts++ + errHandled <- nil + } + + exec := make(chan jobs.Handler, 1) + + assert.NoError(t, b.Consume(pipe, exec, errHandler)) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{ + Job: "test", + Payload: "body", + Options: &jobs.Options{Attempts: 3}, + }) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + return fmt.Errorf("job failed") + } + + <-errHandled + <-errHandled + <-errHandled + assert.Equal(t, 3, attempts) +} diff --git a/plugins/jobs/broker/ephemeral/queue.go b/plugins/jobs/broker/ephemeral/queue.go new file mode 100644 index 00000000..a24bc216 --- /dev/null +++ b/plugins/jobs/broker/ephemeral/queue.go @@ -0,0 +1,161 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "sync" + "sync/atomic" + "time" +) + +type queue struct { + on int32 + state *jobs.Stat + + // job pipeline + concurPool chan interface{} + jobs chan *entry + + // on operations + muw sync.Mutex + wg sync.WaitGroup + + // stop channel + wait chan interface{} + + // exec handlers + execPool chan jobs.Handler + errHandler jobs.ErrorHandler +} + +type entry struct { + id string + job *jobs.Job + attempt int +} + +// create new queue +func newQueue(maxConcur int) *queue { + q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)} + + if maxConcur != 0 { + q.concurPool = make(chan interface{}, maxConcur) + for i := 0; i < maxConcur; i++ { + q.concurPool <- nil + } + } + + return q +} + +// serve consumers +func (q *queue) serve() { + q.wait = make(chan interface{}) + atomic.StoreInt32(&q.on, 1) + + for { + e := q.consume() + if e == nil { + q.wg.Wait() + return + } + + if q.concurPool != nil { + <-q.concurPool + } + + atomic.AddInt64(&q.state.Active, 1) + h := <-q.execPool + + go func(h jobs.Handler, e *entry) { + defer q.wg.Done() + + q.do(h, e) + atomic.AddInt64(&q.state.Active, ^int64(0)) + + q.execPool <- h + + if q.concurPool != nil { + q.concurPool <- nil + } + }(h, e) + } +} + +// allocate one job entry +func (q *queue) consume() *entry { + q.muw.Lock() + defer q.muw.Unlock() + + select { + case <-q.wait: + return nil + case e := <-q.jobs: + q.wg.Add(1) + + return e + } +} + +// do singe job +func (q *queue) do(h jobs.Handler, e *entry) { + err := h(e.id, e.job) + + if err == nil { + atomic.AddInt64(&q.state.Queue, ^int64(0)) + return + } + + q.errHandler(e.id, e.job, err) + + if !e.job.Options.CanRetry(e.attempt) { + atomic.AddInt64(&q.state.Queue, ^int64(0)) + return + } + + q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration()) +} + +// stop the queue consuming +func (q *queue) stop() { + if atomic.LoadInt32(&q.on) == 0 { + return + } + + close(q.wait) + + q.muw.Lock() + q.wg.Wait() + q.muw.Unlock() + + atomic.StoreInt32(&q.on, 0) +} + +// add job to the queue +func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) { + if delay == 0 { + atomic.AddInt64(&q.state.Queue, 1) + go func() { + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() + + return + } + + atomic.AddInt64(&q.state.Delayed, 1) + go func() { + time.Sleep(delay) + atomic.AddInt64(&q.state.Delayed, ^int64(0)) + atomic.AddInt64(&q.state.Queue, 1) + + q.jobs <- &entry{id: id, job: j, attempt: attempt} + }() +} + +func (q *queue) stat() *jobs.Stat { + return &jobs.Stat{ + InternalName: ":memory:", + Queue: atomic.LoadInt64(&q.state.Queue), + Active: atomic.LoadInt64(&q.state.Active), + Delayed: atomic.LoadInt64(&q.state.Delayed), + } +} diff --git a/plugins/jobs/broker/ephemeral/stat_test.go b/plugins/jobs/broker/ephemeral/stat_test.go new file mode 100644 index 00000000..0894323c --- /dev/null +++ b/plugins/jobs/broker/ephemeral/stat_test.go @@ -0,0 +1,64 @@ +package ephemeral + +import ( + "github.com/spiral/jobs/v2" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBroker_Stat(t *testing.T) { + b := &Broker{} + _, err := b.Init() + if err != nil { + t.Fatal(err) + } + err = b.Register(pipe) + if err != nil { + t.Fatal(err) + } + + ready := make(chan interface{}) + b.Listen(func(event int, ctx interface{}) { + if event == jobs.EventBrokerReady { + close(ready) + } + }) + + exec := make(chan jobs.Handler, 1) + + go func() { assert.NoError(t, b.Serve()) }() + defer b.Stop() + + <-ready + + jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}}) + + assert.NotEqual(t, "", jid) + assert.NoError(t, perr) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Queue) + assert.Equal(t, int64(0), stat.Active) + + assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {})) + + waitJob := make(chan interface{}) + exec <- func(id string, j *jobs.Job) error { + assert.Equal(t, jid, id) + assert.Equal(t, "body", j.Payload) + + stat, err := b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(1), stat.Active) + + close(waitJob) + return nil + } + + <-waitJob + stat, err = b.Stat(pipe) + assert.NoError(t, err) + assert.Equal(t, int64(0), stat.Queue) + assert.Equal(t, int64(0), stat.Active) +} |