summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/ephemeral
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-30 11:08:40 +0300
committerValery Piashchynski <[email protected]>2021-06-30 11:08:40 +0300
commit2ac3b240b118961c1a30cc18dd22d08b7fac6516 (patch)
tree25f48908286a05ea78e4049d89f88450d0541f99 /plugins/jobs/brokers/ephemeral
parentc0f808bb8c7077e18aa197f024628b9912def58b (diff)
- Update arch diagrams
- Update ephemeral plugin Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/ephemeral')
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go42
-rw-r--r--plugins/jobs/brokers/ephemeral/entry.go21
-rw-r--r--plugins/jobs/brokers/ephemeral/queue.go7
3 files changed, 65 insertions, 5 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 905f5409..95f476a6 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -1,20 +1,38 @@
package ephemeral
import (
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
)
type JobBroker struct {
+ jobs chan *entry
+ queues map[*pipeline.Pipeline]*queue
+ pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
- return &JobBroker{}, nil
+ jb := &JobBroker{
+ jobs: make(chan *entry, 10),
+ pq: q,
+ }
+
+ go jb.serve()
+
+ return jb, nil
}
-func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) {
- panic("implement me")
+func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) {
+ id := uuid.NewString()
+
+ j.jobs <- &entry{
+ id: id,
+ }
+
+ return id, nil
}
func (j *JobBroker) Stat() {
@@ -25,6 +43,20 @@ func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) {
panic("implement me")
}
-func (j *JobBroker) Register(pipeline *pipeline.Pipeline) {
- panic("implement me")
+func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("ephemeral_register")
+ if _, ok := j.queues[pipeline]; !ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name()))
+ }
+
+ j.queues[pipeline] = newQueue()
+
+ return nil
+}
+
+func (j *JobBroker) serve() {
+ for item := range j.jobs {
+ // item should satisfy
+ j.pq.Push(item)
+ }
}
diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go
new file mode 100644
index 00000000..bf8796d5
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/entry.go
@@ -0,0 +1,21 @@
+package ephemeral
+
+type entry struct {
+ id string
+}
+
+func (e *entry) ID() string {
+ return e.id
+}
+
+func (e *entry) Ask() {
+ // no-op
+}
+
+func (e *entry) Nack() {
+ // no-op
+}
+
+func (e *entry) Payload() []byte {
+ panic("implement me")
+}
diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go
new file mode 100644
index 00000000..1c6d865b
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/queue.go
@@ -0,0 +1,7 @@
+package ephemeral
+
+type queue struct{}
+
+func newQueue() *queue {
+ return &queue{}
+}