diff options
author | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-06 17:30:31 +0300 |
commit | 2c78e93222cc9d3b88456175348e42f7f40c449b (patch) | |
tree | be4fc671db33ceb8700019a5ede900c8d900d7c0 /plugins/jobs/plugin.go | |
parent | 207739f7346c98e16087547bc510e1f909671260 (diff) |
Rework ephemeral and binary heaps
Implemented a sync.Cond for binary heap algo to save processor from
spinning in the for loop and receiving nil Items until the Queue will be
filled.
Add num_pollers option to the configuration to specify number of
pollers from the queue.
Add Resume, ResumeAll, Stop, StopAll, PushBatch methods to the ephemeral.
Remove map and use sync.Map in the ephemeral broker.
Add protobuf schema.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 68 |
1 files changed, 38 insertions, 30 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 67077920..8a80479b 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,14 +2,15 @@ package jobs import ( "context" + "sync" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/pkg/priorityqueue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -27,11 +28,13 @@ type Plugin struct { cfg *Config `mapstructure:"jobs"` log logger.Logger + sync.RWMutex + workersPool pool.Pool server server.Server - brokers map[string]Broker - consumers map[string]Consumer + jobConstructors map[string]jobs.Constructor + consumers map[string]jobs.Consumer events events.Handler @@ -57,8 +60,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.events = events.NewEventsHandler() - p.brokers = make(map[string]Broker) - p.consumers = make(map[string]Consumer) + p.jobConstructors = make(map[string]jobs.Constructor) + p.consumers = make(map[string]jobs.Consumer) // initial set of pipelines p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) @@ -67,7 +70,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue2.NewPriorityQueue() + p.queue = priorityqueue.NewBinHeap() p.log = log return nil @@ -77,8 +80,8 @@ func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - for name := range p.brokers { - jb, err := p.brokers[name].InitJobBroker(p.queue) + for name := range p.jobConstructors { + jb, err := p.jobConstructors[name].JobsConstruct("", p.queue) if err != nil { errCh <- err return errCh @@ -109,23 +112,27 @@ func (p *Plugin) Serve() chan error { // start listening go func() { - for { - // get data JOB from the queue - job := p.queue.GetMax() - - if job == nil { - continue - } - - exec := payload.Payload{ - Context: job.Context(), - Body: job.Body(), - } - - _, err = p.workersPool.Exec(exec) - if err != nil { - panic(err) - } + for i := uint8(0); i < p.cfg.NumPollers; i++ { + go func() { + for { + // get data JOB from the queue + job := p.queue.GetMax() + + if job == nil { + continue + } + + exec := payload.Payload{ + Context: job.Context(), + Body: job.Body(), + } + + _, err := p.workersPool.Exec(exec) + if err != nil { + panic(err) + } + } + }() } }() @@ -142,8 +149,8 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) { - p.brokers[name.Name()] = c +func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { + p.jobConstructors[name.Name()] = c } func (p *Plugin) Available() {} @@ -152,12 +159,13 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Push(j *structs.Job) (string, error) { +func (p *Plugin) Push(j *structs.Job) (*string, error) { + const op = errors.Op("jobs_plugin_push") pipe := p.pipelines.Get(j.Options.Pipeline) broker, ok := p.consumers[pipe.Driver()] if !ok { - panic("broker not found") + return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) } id, err := broker.Push(j) |