diff options
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/broker.go')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 42 |
1 files changed, 37 insertions, 5 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 905f5409..95f476a6 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -1,20 +1,38 @@ package ephemeral import ( + "github.com/google/uuid" + "github.com/spiral/errors" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) type JobBroker struct { + jobs chan *entry + queues map[*pipeline.Pipeline]*queue + pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { - return &JobBroker{}, nil + jb := &JobBroker{ + jobs: make(chan *entry, 10), + pq: q, + } + + go jb.serve() + + return jb, nil } -func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) { - panic("implement me") +func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) { + id := uuid.NewString() + + j.jobs <- &entry{ + id: id, + } + + return id, nil } func (j *JobBroker) Stat() { @@ -25,6 +43,20 @@ func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { panic("implement me") } -func (j *JobBroker) Register(pipeline *pipeline.Pipeline) { - panic("implement me") +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { + const op = errors.Op("ephemeral_register") + if _, ok := j.queues[pipeline]; !ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name())) + } + + j.queues[pipeline] = newQueue() + + return nil +} + +func (j *JobBroker) serve() { + for item := range j.jobs { + // item should satisfy + j.pq.Push(item) + } } |