diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 68 |
1 files changed, 40 insertions, 28 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 6bf43a11..67077920 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,13 +2,14 @@ package jobs import ( "context" - "fmt" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + priorityqueue "github.com/spiral/roadrunner/v2/common/priority_queue" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + priorityqueue2 "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -23,7 +24,7 @@ const ( ) type Plugin struct { - cfg *Config + cfg *Config `mapstructure:"jobs"` log logger.Logger workersPool pool.Pool @@ -41,10 +42,6 @@ type Plugin struct { pipelines pipeline.Pipelines } -func testListener(data interface{}) { - fmt.Println(data) -} - func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { const op = errors.Op("jobs_plugin_init") if !cfg.Has(PluginName) { @@ -60,7 +57,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.events = events.NewEventsHandler() - p.events.AddListener(testListener) p.brokers = make(map[string]Broker) p.consumers = make(map[string]Consumer) @@ -71,7 +67,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } // initialize priority queue - p.queue = priorityqueue.NewBinHeap() + p.queue = priorityqueue2.NewPriorityQueue() p.log = log return nil @@ -79,6 +75,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) + const op = errors.Op("jobs_plugin_serve") for name := range p.brokers { jb, err := p.brokers[name].InitJobBroker(p.queue) @@ -90,31 +87,48 @@ func (p *Plugin) Serve() chan error { p.consumers[name] = jb } + // register initial pipelines + for i := 0; i < len(p.pipelines); i++ { + pipe := p.pipelines[i] + + if jb, ok := p.consumers[pipe.Driver()]; ok { + err := jb.Register(pipe.Name()) + if err != nil { + errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) + return errCh + } + } + } + var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) if err != nil { errCh <- err return errCh } - // initialize sub-plugins - // provide a queue to them - // start consume loop - // start resp loop + // start listening + go func() { + for { + // get data JOB from the queue + job := p.queue.GetMax() + + if job == nil { + continue + } - /* - go func() { - for { - // get data JOB from the queue - job := p.queue.Pop() + exec := payload.Payload{ + Context: job.Context(), + Body: job.Body(), + } - // request - _ = job - p.workersPool.Exec(nil) + _, err = p.workersPool.Exec(exec) + if err != nil { + panic(err) } - }() + } + }() - */ return errCh } @@ -141,18 +155,16 @@ func (p *Plugin) Name() string { func (p *Plugin) Push(j *structs.Job) (string, error) { pipe := p.pipelines.Get(j.Options.Pipeline) - broker, ok := p.consumers[pipe.Broker()] + broker, ok := p.consumers[pipe.Driver()] if !ok { panic("broker not found") } - id, err := broker.Push(pipe, j) + id, err := broker.Push(j) if err != nil { panic(err) } - // p.events.Push() - return id, nil } |