summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/ephemeral/broker.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/broker.go')
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go50
1 files changed, 29 insertions, 21 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 4d357c34..3eb20c27 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -3,60 +3,68 @@ package ephemeral
import (
"github.com/google/uuid"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
type JobBroker struct {
- jobs chan *entry
- queues map[*pipeline.Pipeline]*queue
+ queues map[string]bool
pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
jb := &JobBroker{
- jobs: make(chan *entry, 10),
- pq: q,
+ queues: make(map[string]bool),
+ pq: q,
}
- go jb.serve()
-
return jb, nil
}
-func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) {
- id := uuid.NewString()
+func (j *JobBroker) Push(job *structs.Job) (string, error) {
+ const op = errors.Op("ephemeral_push")
+
+ // check if the pipeline registered
+ if b, ok := j.queues[job.Options.Pipeline]; ok {
+ if !b {
+ return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ }
+ if job.Options.Priority == nil {
+ job.Options.Priority = intPtr(10)
+ }
+ job.Options.ID = uuid.NewString()
+
+ j.pq.Insert(job)
- j.jobs <- &entry{
- id: id,
+ return job.Options.ID, nil
}
- return id, nil
+ return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
func (j *JobBroker) Stat() {
panic("implement me")
}
-func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
+func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
panic("implement me")
}
-func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
+func (j *JobBroker) Register(pipeline string) error {
const op = errors.Op("ephemeral_register")
- if _, ok := j.queues[pipeline]; !ok {
- return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name()))
+ if _, ok := j.queues[pipeline]; ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
}
- j.queues[pipeline] = newQueue()
+ j.queues[pipeline] = true
return nil
}
-func (j *JobBroker) serve() {
- for item := range j.jobs {
- // item should satisfy
- j.pq.Insert(item)
+func intPtr(val uint64) *uint64 {
+ if val == 0 {
+ val = 10
}
+ return &val
}