summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-01 21:28:23 +0300
committerValery Piashchynski <[email protected]>2021-07-01 21:28:23 +0300
commita4b9d0c47494431c30f357c3616e53baf411360f (patch)
tree281f19cb031ff66f6360e11ec610dbc0a2fd7920 /plugins/jobs/plugin.go
parentcd64c6c44ad463963705a22af2bd49059987949c (diff)
- Remove Dispatcher
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go28
1 files changed, 14 insertions, 14 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index ab7222ae..690402d6 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -10,6 +10,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -35,13 +36,16 @@ type Plugin struct {
// priority queue implementation
queue priorityqueue.Queue
+
+ // parent config for broken options.
+ pipelines pipeline.Pipelines
}
func testListener(data interface{}) {
fmt.Println(data)
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, pq priorityqueue.Queue) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
const op = errors.Op("jobs_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -52,10 +56,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- err = p.cfg.InitDefaults()
- if err != nil {
- return errors.E(op, err)
- }
+ p.cfg.InitDefaults()
p.server = server
p.events = events.NewEventsHandler()
@@ -63,8 +64,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.brokers = make(map[string]Broker)
p.consumers = make(map[string]Consumer)
+ // initial set of pipelines
+ p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
// initialize priority queue
- p.queue = pq
+ p.queue = priorityqueue.NewPriorityQueue()
p.log = log
return nil
@@ -132,14 +139,7 @@ func (p *Plugin) Name() string {
}
func (p *Plugin) Push(j *structs.Job) (string, error) {
- pipe, pOpts, err := p.cfg.MatchPipeline(j)
- if err != nil {
- panic(err)
- }
-
- if pOpts != nil {
- j.Options.Merge(pOpts)
- }
+ pipe := p.pipelines.Get(j.Options.Pipeline)
broker, ok := p.consumers[pipe.Broker()]
if !ok {