diff options
author | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-07 18:33:04 +0300 |
commit | 60c229c8506df465586434309af5acd1f84e2406 (patch) | |
tree | 18fdf380b7e032415d656e84bcc3c7a057f194a8 /plugins/jobs/plugin.go | |
parent | 127186a72d4b8d30f6ada72ade661d8713490728 (diff) |
Updated ephemeral plugin, PQ and protobuf...
Implement core of the root jobs plugin with a proper drivers/pipelines
handling mechanism.
Add delayed jobs for the ephemeral plugin.
Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with
a slice of the pipelines.
Other small improvements.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 161 |
1 files changed, 123 insertions, 38 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index c3f766b9..d603dce6 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -2,6 +2,7 @@ package jobs import ( "context" + "fmt" "sync" endure "github.com/spiral/endure/pkg/container" @@ -16,13 +17,14 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/utils" ) const ( // RrJobs env variable RrJobs string = "rr_jobs" PluginName string = "jobs" + + pipelines string = "pipelines" ) type Plugin struct { @@ -42,8 +44,8 @@ type Plugin struct { // priority queue implementation queue priorityqueue.Queue - // parent config for broken options. - pipelines pipeline.Pipelines + // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline + pipelines sync.Map } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -65,9 +67,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]jobs.Consumer) // initial set of pipelines - p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines) - if err != nil { - return errors.E(op, err) + for i := range p.cfg.Pipelines { + p.pipelines.Store(i, p.cfg.Pipelines[i]) } // initialize priority queue @@ -81,28 +82,42 @@ func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("jobs_plugin_serve") - for name := range p.jobConstructors { - jb, err := p.jobConstructors[name].JobsConstruct("", p.queue) - if err != nil { - errCh <- err - return errCh - } - - p.consumers[name] = jb - } - // register initial pipelines - for i := 0; i < len(p.pipelines); i++ { - pipe := p.pipelines[i] + p.pipelines.Range(func(key, value interface{}) bool { + // pipeline name (ie test-local, sqs-aws, etc) + name := key.(string) + + // pipeline associated with the name + pipe := value.(*pipeline.Pipeline) + // driver for the pipeline (ie amqp, ephemeral, etc) + dr := pipe.Driver() + + // jobConstructors contains constructors for the drivers + // we need here to initialize these drivers for the pipelines + if c, ok := p.jobConstructors[dr]; ok { + // config key for the particular sub-driver jobs.pipelines.test-local + configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) + + // init the driver + initializedDriver, err := c.JobsConstruct(configKey, p.queue) + if err != nil { + errCh <- errors.E(op, err) + return false + } - if jb, ok := p.consumers[pipe.Driver()]; ok { - err := jb.Register(pipe.Name()) + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers[name] = initializedDriver + + // register pipeline for the initialized driver + err = initializedDriver.Register(pipe) if err != nil { errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) - return errCh + return false } } - } + + return true + }) var err error p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}) @@ -119,12 +134,18 @@ func (p *Plugin) Serve() chan error { // get data JOB from the queue job := p.queue.GetMax() + ctx, err := job.Context() + if err != nil { + job.Nack() + p.log.Error("job marshal context", "error", err) + } + exec := payload.Payload{ - Context: job.Context(), + Context: ctx, Body: job.Body(), } - _, err := p.workersPool.Exec(exec) + _, err = p.workersPool.Exec(exec) if err != nil { job.Nack() p.log.Error("job execute", "error", err) @@ -160,41 +181,105 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Push(j *structs.Job) (*string, error) { +func (p *Plugin) Push(j *structs.Job) error { const op = errors.Op("jobs_plugin_push") - pipe := p.pipelines.Get(j.Options.Pipeline) - broker, ok := p.consumers[pipe.Driver()] + // get the pipeline for the job + pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { - return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline)) } - id, err := broker.Push(j) + // type conversion + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // if job has no priority, inherit it from the pipeline + if j.Options.Priority == 0 { + j.Options.Priority = ppl.Priority() + } + + err := d.Push(j) if err != nil { - panic(err) + return errors.E(op, err) } - return id, nil + return nil } -func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) { +func (p *Plugin) PushBatch(j []*structs.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { - pipe := p.pipelines.Get(j[i].Options.Pipeline) + // get the pipeline for the job + pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline)) + } + + ppl := pipe.(*pipeline.Pipeline) - broker, ok := p.consumers[pipe.Driver()] + d, ok := p.consumers[ppl.Name()] if !ok { - return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver())) + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // if job has no priority, inherit it from the pipeline + if j[i].Options.Priority == 0 { + j[i].Options.Priority = ppl.Priority() } - _, err := broker.Push(j[i]) + err := d.Push(j[i]) if err != nil { - return nil, errors.E(op, err) + return errors.E(op, err) } } - return utils.AsStringPtr("test"), nil + return nil +} + +func (p *Plugin) Pause(pipelines []string) { + for i := 0; i < len(pipelines); i++ { + pipe, ok := p.pipelines.Load(pipelines[i]) + if !ok { + p.log.Error("no such pipeline", "requested", pipelines[i]) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) + return + } + + // redirect call to the underlying driver + d.Pause(ppl.Name()) + } +} + +func (p *Plugin) Resume(pipelines []string) { + for i := 0; i < len(pipelines); i++ { + pipe, ok := p.pipelines.Load(pipelines[i]) + if !ok { + p.log.Error("no such pipeline", "requested", pipelines[i]) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i]) + return + } + + // redirect call to the underlying driver + d.Resume(ppl.Name()) + } } func (p *Plugin) RPC() interface{} { |