diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 41 |
1 files changed, 33 insertions, 8 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index e7466efb..90932edd 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/pool" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -25,9 +26,15 @@ type Plugin struct { log logger.Logger workersPool pool.Pool + server server.Server + brokers map[string]Broker consumers map[string]Consumer - events events.Handler + + events events.Handler + + // priority queue implementation + queue priorityqueue.Queue } func testListener(data interface{}) { @@ -50,21 +57,39 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } - p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) - if err != nil { - return errors.E(op, err) - } - + p.server = server p.events = events.NewEventsHandler() p.events.AddListener(testListener) + p.brokers = make(map[string]Broker) p.consumers = make(map[string]Consumer) + + // initialize priority queue + p.queue = priorityqueue.NewPriorityQueue() p.log = log + return nil } func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) + for name := range p.brokers { + jb, err := p.brokers[name].InitJobBroker(p.queue) + if err != nil { + errCh <- err + return errCh + } + + p.consumers[name] = jb + } + + var err error + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) + if err != nil { + errCh <- err + return errCh + } + // initialize sub-plugins // provide a queue to them // start consume loop @@ -83,8 +108,8 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) { - p.consumers[name.Name()] = c +func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) { + p.brokers[name.Name()] = c } func (p *Plugin) Available() {} |