summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/ephemeral/broker.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/broker.go')
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go42
1 files changed, 37 insertions, 5 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 905f5409..95f476a6 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -1,20 +1,38 @@
package ephemeral
import (
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
type JobBroker struct {
+ jobs chan *entry
+ queues map[*pipeline.Pipeline]*queue
+ pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
- return &JobBroker{}, nil
+ jb := &JobBroker{
+ jobs: make(chan *entry, 10),
+ pq: q,
+ }
+
+ go jb.serve()
+
+ return jb, nil
}
-func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) {
- panic("implement me")
+func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) {
+ id := uuid.NewString()
+
+ j.jobs <- &entry{
+ id: id,
+ }
+
+ return id, nil
}
func (j *JobBroker) Stat() {
@@ -25,6 +43,20 @@ func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
panic("implement me")
}
-func (j *JobBroker) Register(pipeline *pipeline.Pipeline) {
- panic("implement me")
+func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("ephemeral_register")
+ if _, ok := j.queues[pipeline]; !ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name()))
+ }
+
+ j.queues[pipeline] = newQueue()
+
+ return nil
+}
+
+func (j *JobBroker) serve() {
+ for item := range j.jobs {
+ // item should satisfy
+ j.pq.Push(item)
+ }
}