diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
commit | 2c78e93222cc9d3b88456175348e42f7f40c449b (patch) | |
tree | be4fc671db33ceb8700019a5ede900c8d900d7c0 /plugins/jobs/brokers/ephemeral | |
parent | 207739f7346c98e16087547bc510e1f909671260 (diff) |
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from
spinning in the for loop and receiving nil Items until the Queue will be
filled.
Add num_pollers option to the configuration to specify number of
pollers from the queue.
Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral.
Remove map and use sync.Map in the ephemeral broker.
Add protobuf schema.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/ephemeral')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/broker.go | 82 | ||||
-rw-r--r-- | plugins/jobs/brokers/ephemeral/plugin.go | 6 |
2 files changed, 61 insertions, 27 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go index 3eb20c27..4bbb4095 100644 --- a/plugins/jobs/brokers/ephemeral/broker.go +++ b/plugins/jobs/brokers/ephemeral/broker.go @@ -1,70 +1,104 @@ package ephemeral import ( + "sync" + "github.com/google/uuid" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/utils" ) type JobBroker struct { - queues map[string]bool + queues sync.Map pq priorityqueue.Queue } func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - queues: make(map[string]bool), + queues: sync.Map{}, pq: q, } return jb, nil } -func (j *JobBroker) Push(job *structs.Job) (string, error) { +func (j *JobBroker) Push(job *structs.Job) (*string, error) { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.queues[job.Options.Pipeline]; ok { - if !b { - return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) + if b, ok := j.queues.Load(job.Options.Pipeline); ok { + if !b.(bool) { + return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline)) } if job.Options.Priority == nil { - job.Options.Priority = intPtr(10) + job.Options.Priority = utils.AsUint64Ptr(10) } - job.Options.ID = uuid.NewString() + job.Options.ID = utils.AsStringPtr(uuid.NewString()) j.pq.Insert(job) return job.Options.ID, nil } - return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) -} - -func (j *JobBroker) Stat() { - panic("implement me") -} - -func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { - panic("implement me") + return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } func (j *JobBroker) Register(pipeline string) error { const op = errors.Op("ephemeral_register") - if _, ok := j.queues[pipeline]; ok { + if _, ok := j.queues.Load(pipeline); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) } - j.queues[pipeline] = true + j.queues.Store(pipeline, true) return nil } -func intPtr(val uint64) *uint64 { - if val == 0 { - val = 10 +func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) { + // Use a batch response + // Add JobID to the payload to match responses + panic("todo") +} + +func (j *JobBroker) Stop(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == true { + // mark pipeline as turned off + j.queues.Store(pipeline, false) + } + } +} + +func (j *JobBroker) StopAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, false) + return true + }) +} + +func (j *JobBroker) Resume(pipeline string) { + if q, ok := j.queues.Load(pipeline); ok { + if q == false { + // mark pipeline as turned off + j.queues.Store(pipeline, true) + } } - return &val +} + +func (j *JobBroker) ResumeAll() { + j.queues.Range(func(key, value interface{}) bool { + j.queues.Store(key, true) + return true + }) +} + +func (j *JobBroker) Stat() { + panic("implement me") +} + +func (j *JobBroker) Consume(pipe *pipeline.Pipeline) { + panic("implement me") } diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go index 146d1fdc..3d6a95b7 100644 --- a/plugins/jobs/brokers/ephemeral/plugin.go +++ b/plugins/jobs/brokers/ephemeral/plugin.go @@ -1,8 +1,8 @@ package ephemeral import ( - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -23,6 +23,6 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) InitJobBroker(q priorityqueue.Queue) (jobs.Consumer, error) { +func (p *Plugin) JobsConstruct(_ string, q priorityqueue.Queue) (jobs.Consumer, error) { return NewJobBroker(q) } |