diff options
author | Valery Piashchynski <[email protected]> | 2021-07-01 21:28:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-01 21:28:23 +0300 |
commit | a4b9d0c47494431c30f357c3616e53baf411360f (patch) | |
tree | 281f19cb031ff66f6360e11ec610dbc0a2fd7920 /plugins/jobs/plugin.go | |
parent | cd64c6c44ad463963705a22af2bd49059987949c (diff) |
- Remove Dispatcher
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index ab7222ae..690402d6 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -10,6 +10,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" @@ -35,13 +36,16 @@ type Plugin struct { // priority queue implementation queue priorityqueue.Queue + + // parent config for broken options. + pipelines pipeline.Pipelines } func testListener(data interface{}) { fmt.Println(data) } -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, pq priorityqueue.Queue) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -52,10 +56,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } - err = p.cfg.InitDefaults() - if err != nil { - return errors.E(op, err) - } + p.cfg.InitDefaults() p.server = server p.events = events.NewEventsHandler() @@ -63,8 +64,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.brokers = make(map[string]Broker) p.consumers = make(map[string]Consumer) + // initial set of pipelines + p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) + if err != nil { + return errors.E(op, err) + } + // initialize priority queue - p.queue = pq + p.queue = priorityqueue.NewPriorityQueue() p.log = log return nil @@ -132,14 +139,7 @@ func (p *Plugin) Name() string { } func (p *Plugin) Push(j *structs.Job) (string, error) { - pipe, pOpts, err := p.cfg.MatchPipeline(j) - if err != nil { - panic(err) - } - - if pOpts != nil { - j.Options.Merge(pOpts) - } + pipe := p.pipelines.Get(j.Options.Pipeline) broker, ok := p.consumers[pipe.Broker()] if !ok { |