diff options
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 48 |
1 files changed, 41 insertions, 7 deletions
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index e31e3b25..5cf4c633 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -6,21 +6,47 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" + "github.com/spiral/roadrunner/v2/plugins/logger" ) +type Config struct { + PipelineSize uint64 `mapstructure:"pipeline_size"` +} + type JobBroker struct { - queues sync.Map - pq priorityqueue.Queue + cfg *Config + log logger.Logger + queues sync.Map + pq priorityqueue.Queue + localQueue chan *Item } -func NewJobBroker(q priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) { + const op = errors.Op("new_ephemeral_pipeline") + jb := &JobBroker{ - queues: sync.Map{}, - pq: q, + log: log, + pq: q, + } + + err := cfg.UnmarshalKey(configKey, &jb.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + if jb.cfg.PipelineSize == 0 { + jb.cfg.PipelineSize = 100_000 } + // initialize a local queue + jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + + // consume from the queue + go jb.consume() + return jb, nil } @@ -39,13 +65,14 @@ func (j *JobBroker) Push(job *structs.Job) error { time.Sleep(jj.Options.TimeoutDuration()) // send the item after timeout expired - j.pq.Insert(From(job)) + j.localQueue <- From(job) }(job) return nil } - j.pq.Insert(From(job)) + // insert to the local, limited pipeline + j.localQueue <- From(job) return nil } @@ -53,6 +80,13 @@ func (j *JobBroker) Push(job *structs.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) } +func (j *JobBroker) consume() { + // redirect + for item := range j.localQueue { + j.pq.Insert(item) + } +} + func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.queues.Load(pipeline.Name()); ok { |