summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/ephemeral/broker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-06 17:30:31 +0300
committerValery Piashchynski <[email protected]>2021-07-06 17:30:31 +0300
commit2c78e93222cc9d3b88456175348e42f7f40c449b (patch)
treebe4fc671db33ceb8700019a5ede900c8d900d7c0 /plugins/jobs/brokers/ephemeral/broker.go
parent207739f7346c98e16087547bc510e1f909671260 (diff)
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from spinning in the for loop and receiving nil Items until the Queue will be filled. Add num_pollers option to the configuration to specify number of pollers from the queue. Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral. Remove map and use sync.Map in the ephemeral broker. Add protobuf schema. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/broker.go')
-rw-r--r--plugins/jobs/brokers/ephemeral/broker.go82
1 files changed, 58 insertions, 24 deletions
diff --git a/plugins/jobs/brokers/ephemeral/broker.go b/plugins/jobs/brokers/ephemeral/broker.go
index 3eb20c27..4bbb4095 100644
--- a/plugins/jobs/brokers/ephemeral/broker.go
+++ b/plugins/jobs/brokers/ephemeral/broker.go
@@ -1,70 +1,104 @@
package ephemeral
import (
+ "sync"
+
"github.com/google/uuid"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobBroker struct {
- queues map[string]bool
+ queues sync.Map
pq priorityqueue.Queue
}
func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) {
jb := &JobBroker{
- queues: make(map[string]bool),
+ queues: sync.Map{},
pq: q,
}
return jb, nil
}
-func (j *JobBroker) Push(job *structs.Job) (string, error) {
+func (j *JobBroker) Push(job *structs.Job) (*string, error) {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- if b, ok := j.queues[job.Options.Pipeline]; ok {
- if !b {
- return "", errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
+ if b, ok := j.queues.Load(job.Options.Pipeline); ok {
+ if !b.(bool) {
+ return nil, errors.E(op, errors.Errorf("pipeline disabled: %s", job.Options.Pipeline))
}
if job.Options.Priority == nil {
- job.Options.Priority = intPtr(10)
+ job.Options.Priority = utils.AsUint64Ptr(10)
}
- job.Options.ID = uuid.NewString()
+ job.Options.ID = utils.AsStringPtr(uuid.NewString())
j.pq.Insert(job)
return job.Options.ID, nil
}
- return "", errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
-}
-
-func (j *JobBroker) Stat() {
- panic("implement me")
-}
-
-func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
- panic("implement me")
+ return nil, errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
}
func (j *JobBroker) Register(pipeline string) error {
const op = errors.Op("ephemeral_register")
- if _, ok := j.queues[pipeline]; ok {
+ if _, ok := j.queues.Load(pipeline); ok {
return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
}
- j.queues[pipeline] = true
+ j.queues.Store(pipeline, true)
return nil
}
-func intPtr(val uint64) *uint64 {
- if val == 0 {
- val = 10
+func (j *JobBroker) PushBatch(job *[]structs.Job) (*string, error) {
+ // Use a batch response
+ // Add JobID to the payload to match responses
+ panic("todo")
+}
+
+func (j *JobBroker) Stop(pipeline string) {
+ if q, ok := j.queues.Load(pipeline); ok {
+ if q == true {
+ // mark pipeline as turned off
+ j.queues.Store(pipeline, false)
+ }
+ }
+}
+
+func (j *JobBroker) StopAll() {
+ j.queues.Range(func(key, value interface{}) bool {
+ j.queues.Store(key, false)
+ return true
+ })
+}
+
+func (j *JobBroker) Resume(pipeline string) {
+ if q, ok := j.queues.Load(pipeline); ok {
+ if q == false {
+ // mark pipeline as turned off
+ j.queues.Store(pipeline, true)
+ }
}
- return &val
+}
+
+func (j *JobBroker) ResumeAll() {
+ j.queues.Range(func(key, value interface{}) bool {
+ j.queues.Store(key, true)
+ return true
+ })
+}
+
+func (j *JobBroker) Stat() {
+ panic("implement me")
+}
+
+func (j *JobBroker) Consume(pipe *pipeline.Pipeline) {
+ panic("implement me")
}