summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go68
1 files changed, 38 insertions, 30 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 67077920..8a80479b 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -2,14 +2,15 @@ package jobs
import (
"context"
+ "sync"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
+ "github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
- priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -27,11 +28,13 @@ type Plugin struct {
cfg *Config `mapstructure:"jobs"`
log logger.Logger
+ sync.RWMutex
+
workersPool pool.Pool
server server.Server
- brokers map[string]Broker
- consumers map[string]Consumer
+ jobConstructors map[string]jobs.Constructor
+ consumers map[string]jobs.Consumer
events events.Handler
@@ -57,8 +60,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.events = events.NewEventsHandler()
- p.brokers = make(map[string]Broker)
- p.consumers = make(map[string]Consumer)
+ p.jobConstructors = make(map[string]jobs.Constructor)
+ p.consumers = make(map[string]jobs.Consumer)
// initial set of pipelines
p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines)
@@ -67,7 +70,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue2.NewPriorityQueue()
+ p.queue = priorityqueue.NewBinHeap()
p.log = log
return nil
@@ -77,8 +80,8 @@ func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
- for name := range p.brokers {
- jb, err := p.brokers[name].InitJobBroker(p.queue)
+ for name := range p.jobConstructors {
+ jb, err := p.jobConstructors[name].JobsConstruct("", p.queue)
if err != nil {
errCh <- err
return errCh
@@ -109,23 +112,27 @@ func (p *Plugin) Serve() chan error {
// start listening
go func() {
- for {
- // get data JOB from the queue
- job := p.queue.GetMax()
-
- if job == nil {
- continue
- }
-
- exec := payload.Payload{
- Context: job.Context(),
- Body: job.Body(),
- }
-
- _, err = p.workersPool.Exec(exec)
- if err != nil {
- panic(err)
- }
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ go func() {
+ for {
+ // get data JOB from the queue
+ job := p.queue.GetMax()
+
+ if job == nil {
+ continue
+ }
+
+ exec := payload.Payload{
+ Context: job.Context(),
+ Body: job.Body(),
+ }
+
+ _, err := p.workersPool.Exec(exec)
+ if err != nil {
+ panic(err)
+ }
+ }
+ }()
}
}()
@@ -142,8 +149,8 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) CollectMQBrokers(name endure.Named, c Broker) {
- p.brokers[name.Name()] = c
+func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
+ p.jobConstructors[name.Name()] = c
}
func (p *Plugin) Available() {}
@@ -152,12 +159,13 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Push(j *structs.Job) (string, error) {
+func (p *Plugin) Push(j *structs.Job) (*string, error) {
+ const op = errors.Op("jobs_plugin_push")
pipe := p.pipelines.Get(j.Options.Pipeline)
broker, ok := p.consumers[pipe.Driver()]
if !ok {
- panic("broker not found")
+ return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver()))
}
id, err := broker.Push(j)