diff options
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/broker.go')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 82 |
1 files changed, 58 insertions, 24 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 3eb20c27..4bbb4095 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -1,70 +1,104 @@ package ephemeral import ( + "sync" + "github.com/google/uuid" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" ) type JobBroker struct { - queues map[string]bool + queues sync.Map pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - queues: make(map[string]bool), + queues: sync.Map{}, pq: q, } return jb, nil } -func (j *JobBroker) Push(job *structs.Job) (string, error) { +func (j *JobBroker) Push(job *structs.Job) (*string, error) { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.queues[job.Options.Pipeline]; ok { - if !b { - return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + if b, ok := j.queues.Load(job.Options.Pipeline); ok { + if !b.(bool) { + return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } if job.Options.Priority == nil { - job.Options.Priority = intPtr(10) + job.Options.Priority = utils.AsUint64Ptr(10) } - job.Options.ID = uuid.NewString() + job.Options.ID = utils.AsStringPtr(uuid.NewString()) j.pq.Insert(job) return job.Options.ID, nil } - return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) -} - -func (j *JobBroker) Stat() { - panic("implement me") -} - -func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { - panic("implement me") + return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } func (j *JobBroker) Register(pipeline string) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues[pipeline]; ok { + if _, ok := j.queues.Load(pipeline); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues[pipeline] = true + j.queues.Store(pipeline, true) return nil } -func intPtr(val uint64) *uint64 { - if val == 0 { - val = 10 +func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) { + // Use a batch response + // Add JobID to the payload to match responses + panic("todo") +} + +func (j *JobBroker) Stop(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.queues.Store(pipeline, false) + } + } +} + +func (j *JobBroker) StopAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, false) + return true + }) +} + +func (j *JobBroker) Resume(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.queues.Store(pipeline, true) + } } - return &val +} + +func (j *JobBroker) ResumeAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, true) + return true + }) +} + +func (j *JobBroker) Stat() { + panic("implement me") +} + +func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { + panic("implement me") } |