diff options
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 53 |
1 files changed, 40 insertions, 13 deletions
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index b51af322..559cb2e9 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -5,6 +5,7 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -23,19 +24,21 @@ type Config struct { type JobBroker struct { cfg *Config log logger.Logger - queues sync.Map + eh events.Handler + pipeline sync.Map pq priorityqueue.Queue localQueue chan *Item stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { const op = errors.Op("new_ephemeral_pipeline") jb := &JobBroker{ log: log, pq: pq, + eh: eh, stopCh: make(chan struct{}, 1), } @@ -57,10 +60,11 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, pq return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, pq priorityqueue.Queue) (*JobBroker, error) { +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ log: log, pq: pq, + eh: eh, stopCh: make(chan struct{}, 1), } @@ -79,7 +83,7 @@ func (j *JobBroker) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.queues.Load(job.Options.Pipeline); ok { + if b, ok := j.pipeline.Load(job.Options.Pipeline); ok { if !b.(bool) { return errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } @@ -119,37 +123,51 @@ func (j *JobBroker) consume() { func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues.Load(pipeline.Name()); ok { + if _, ok := j.pipeline.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues.Store(pipeline.Name(), true) + j.pipeline.Store(pipeline.Name(), true) return nil } func (j *JobBroker) Pause(pipeline string) { - if q, ok := j.queues.Load(pipeline); ok { + if q, ok := j.pipeline.Load(pipeline); ok { if q == true { // mark pipeline as turned off - j.queues.Store(pipeline, false) + j.pipeline.Store(pipeline, false) } } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipeline, + Start: time.Now(), + Elapsed: 0, + }) } func (j *JobBroker) Resume(pipeline string) { - if q, ok := j.queues.Load(pipeline); ok { + if q, ok := j.pipeline.Load(pipeline); ok { if q == false { // mark pipeline as turned off - j.queues.Store(pipeline, true) + j.pipeline.Store(pipeline, true) } } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipeline, + Start: time.Now(), + Elapsed: 0, + }) } func (j *JobBroker) List() []string { out := make([]string, 0, 2) - j.queues.Range(func(key, value interface{}) bool { + j.pipeline.Range(func(key, value interface{}) bool { pipe := key.(string) out = append(out, pipe) return true @@ -164,13 +182,22 @@ func (j *JobBroker) Run(_ *pipeline.Pipeline) error { } func (j *JobBroker) Stop() error { - j.queues.Range(func(key, _ interface{}) bool { - j.queues.Delete(key) + var pipe string + j.pipeline.Range(func(key, _ interface{}) bool { + pipe = key.(string) + j.pipeline.Delete(key) return true }) // return from the consumer j.stopCh <- struct{}{} + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe, + Start: time.Now(), + Elapsed: 0, + }) + return nil } |