summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go68
1 files changed, 40 insertions, 28 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 6bf43a11..67077920 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -2,13 +2,14 @@ package jobs
import (
"context"
- "fmt"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -23,7 +24,7 @@ const (
)
type Plugin struct {
- cfg *Config
+ cfg *Config `mapstructure:"jobs"`
log logger.Logger
workersPool pool.Pool
@@ -41,10 +42,6 @@ type Plugin struct {
pipelines pipeline.Pipelines
}
-func testListener(data interface{}) {
- fmt.Println(data)
-}
-
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
const op = errors.Op("jobs_plugin_init")
if !cfg.Has(PluginName) {
@@ -60,7 +57,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.events = events.NewEventsHandler()
- p.events.AddListener(testListener)
p.brokers = make(map[string]Broker)
p.consumers = make(map[string]Consumer)
@@ -71,7 +67,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
// initialize priority queue
- p.queue = priorityqueue.NewBinHeap()
+ p.queue = priorityqueue2.NewPriorityQueue()
p.log = log
return nil
@@ -79,6 +75,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
+ const op = errors.Op("jobs_plugin_serve")
for name := range p.brokers {
jb, err := p.brokers[name].InitJobBroker(p.queue)
@@ -90,31 +87,48 @@ func (p *Plugin) Serve() chan error {
p.consumers[name] = jb
}
+ // register initial pipelines
+ for i := 0; i < len(p.pipelines); i++ {
+ pipe := p.pipelines[i]
+
+ if jb, ok := p.consumers[pipe.Driver()]; ok {
+ err := jb.Register(pipe.Name())
+ if err != nil {
+ errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
+ return errCh
+ }
+ }
+ }
+
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"})
if err != nil {
errCh <- err
return errCh
}
- // initialize sub-plugins
- // provide a queue to them
- // start consume loop
- // start resp loop
+ // start listening
+ go func() {
+ for {
+ // get data JOB from the queue
+ job := p.queue.GetMax()
+
+ if job == nil {
+ continue
+ }
- /*
- go func() {
- for {
- // get data JOB from the queue
- job := p.queue.Pop()
+ exec := payload.Payload{
+ Context: job.Context(),
+ Body: job.Body(),
+ }
- // request
- _ = job
- p.workersPool.Exec(nil)
+ _, err = p.workersPool.Exec(exec)
+ if err != nil {
+ panic(err)
}
- }()
+ }
+ }()
- */
return errCh
}
@@ -141,18 +155,16 @@ func (p *Plugin) Name() string {
func (p *Plugin) Push(j *structs.Job) (string, error) {
pipe := p.pipelines.Get(j.Options.Pipeline)
- broker, ok := p.consumers[pipe.Broker()]
+ broker, ok := p.consumers[pipe.Driver()]
if !ok {
panic("broker not found")
}
- id, err := broker.Push(pipe, j)
+ id, err := broker.Push(j)
if err != nil {
panic(err)
}
- // p.events.Push()
-
return id, nil
}