diff options
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/broker.go')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 4d357c34..3eb20c27 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -3,60 +3,68 @@ package ephemeral import ( "github.com/google/uuid" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) type JobBroker struct { - jobs chan *entry - queues map[*pipeline.Pipeline]*queue + queues map[string]bool pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - jobs: make(chan *entry, 10), - pq: q, + queues: make(map[string]bool), + pq: q, } - go jb.serve() - return jb, nil } -func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) { - id := uuid.NewString() +func (j *JobBroker) Push(job *structs.Job) (string, error) { + const op = errors.Op("ephemeral_push") + + // check if the pipeline registered + if b, ok := j.queues[job.Options.Pipeline]; ok { + if !b { + return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + } + if job.Options.Priority == nil { + job.Options.Priority = intPtr(10) + } + job.Options.ID = uuid.NewString() + + j.pq.Insert(job) - j.jobs <- &entry{ - id: id, + return job.Options.ID, nil } - return id, nil + return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } func (j *JobBroker) Stat() { panic("implement me") } -func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { +func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { panic("implement me") } -func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { +func (j *JobBroker) Register(pipeline string) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues[pipeline]; !ok { - return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name())) + if _, ok := j.queues[pipeline]; ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues[pipeline] = newQueue() + j.queues[pipeline] = true return nil } -func (j *JobBroker) serve() { - for item := range j.jobs { - // item should satisfy - j.pq.Insert(item) +func intPtr(val uint64) *uint64 { + if val == 0 { + val = 10 } + return &val } |