diff options
author | Valery Piashchynski <[email protected]> | 2021-06-30 11:08:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-30 11:08:40 +0300 |
commit | 2ac3b240b118961c1a30cc18dd22d08b7fac6516 (patch) | |
tree | 25f48908286a05ea78e4049d89f88450d0541f99 /plugins/jobs/brokers/ephemeral | |
parent | c0f808bb8c7077e18aa197f024628b9912def58b (diff) |
- Update arch diagrams
- Update ephemeral plugin
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/ephemeral')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 42 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/entry.go | 21 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/queue.go | 7 |
3 files changed, 65 insertions, 5 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 905f5409..95f476a6 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -1,20 +1,38 @@ package ephemeral import ( + "github.com/google/uuid" + "github.com/spiral/errors" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" ) type JobBroker struct { + jobs chan *entry + queues map[*pipeline.Pipeline]*queue + pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { - return &JobBroker{}, nil + jb := &JobBroker{ + jobs: make(chan *entry, 10), + pq: q, + } + + go jb.serve() + + return jb, nil } -func (j *JobBroker) Push(pipeline *pipeline.Pipeline, job *structs.Job) (string, error) { - panic("implement me") +func (j *JobBroker) Push(pipe *pipeline.Pipeline, job *structs.Job) (string, error) { + id := uuid.NewString() + + j.jobs <- &entry{ + id: id, + } + + return id, nil } func (j *JobBroker) Stat() { @@ -25,6 +43,20 @@ func (j *JobBroker) Consume(pipeline *pipeline.Pipeline) { panic("implement me") } -func (j *JobBroker) Register(pipeline *pipeline.Pipeline) { - panic("implement me") +func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { + const op = errors.Op("ephemeral_register") + if _, ok := j.queues[pipeline]; !ok { + return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline.Name())) + } + + j.queues[pipeline] = newQueue() + + return nil +} + +func (j *JobBroker) serve() { + for item := range j.jobs { + // item should satisfy + j.pq.Push(item) + } } diff --git a/plugins/jobs/brokers/ephemeral/entry.go b/plugins/jobs/brokers/ephemeral/entry.go new file mode 100644 index 00000000..bf8796d5 --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/entry.go @@ -0,0 +1,21 @@ +package ephemeral + +type entry struct { + id string +} + +func (e *entry) ID() string { + return e.id +} + +func (e *entry) Ask() { + // no-op +} + +func (e *entry) Nack() { + // no-op +} + +func (e *entry) Payload() []byte { + panic("implement me") +} diff --git a/plugins/jobs/brokers/ephemeral/queue.go b/plugins/jobs/brokers/ephemeral/queue.go new file mode 100644 index 00000000..1c6d865b --- /dev/null +++ b/plugins/jobs/brokers/ephemeral/queue.go @@ -0,0 +1,7 @@ +package ephemeral + +type queue struct{} + +func newQueue() *queue { + return &queue{} +} |