summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go41
1 files changed, 33 insertions, 8 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index e7466efb..90932edd 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -8,6 +8,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/pool"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -25,9 +26,15 @@ type Plugin struct {
log logger.Logger
workersPool pool.Pool
+ server server.Server
+ brokers map[string]Broker
consumers map[string]Consumer
- events events.Handler
+
+ events events.Handler
+
+ // priority queue implementation
+ queue priorityqueue.Queue
}
func testListener(data interface{}) {
@@ -50,21 +57,39 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
- if err != nil {
- return errors.E(op, err)
- }
-
+ p.server = server
p.events = events.NewEventsHandler()
p.events.AddListener(testListener)
+ p.brokers = make(map[string]Broker)
p.consumers = make(map[string]Consumer)
+
+ // initialize priority queue
+ p.queue = priorityqueue.NewPriorityQueue()
p.log = log
+
return nil
}
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
+ for name := range p.brokers {
+ jb, err := p.brokers[name].InitJobBroker(p.queue)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ p.consumers[name] = jb
+ }
+
+ var err error
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
// initialize sub-plugins
// provide a queue to them
// start consume loop
@@ -83,8 +108,8 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) {
- p.consumers[name.Name()] = c
+func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) {
+ p.brokers[name.Name()] = c
}
func (p *Plugin) Available() {}