summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-07 18:33:04 +0300
committerValery Piashchynski <[email protected]>2021-07-07 18:33:04 +0300
commit60c229c8506df465586434309af5acd1f84e2406 (patch)
tree18fdf380b7e032415d656e84bcc3c7a057f194a8 /plugins/jobs/plugin.go
parent127186a72d4b8d30f6ada72ade661d8713490728 (diff)
Updated ephemeral plugin, PQ and protobuf...
Implement core of the root jobs plugin with a proper drivers/pipelines handling mechanism. Add delayed jobs for the ephemeral plugin. Remove ResumeAll, Resume, StopAll, Stop. Replaced with Pause/Resume with a slice of the pipelines. Other small improvements. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go161
1 files changed, 123 insertions, 38 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index c3f766b9..d603dce6 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -2,6 +2,7 @@ package jobs
import (
"context"
+ "fmt"
"sync"
endure "github.com/spiral/endure/pkg/container"
@@ -16,13 +17,14 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/utils"
)
const (
// RrJobs env variable
RrJobs string = "rr_jobs"
PluginName string = "jobs"
+
+ pipelines string = "pipelines"
)
type Plugin struct {
@@ -42,8 +44,8 @@ type Plugin struct {
// priority queue implementation
queue priorityqueue.Queue
- // parent config for broken options.
- pipelines pipeline.Pipelines
+ // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline
+ pipelines sync.Map
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -65,9 +67,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.consumers = make(map[string]jobs.Consumer)
// initial set of pipelines
- p.pipelines, err = pipeline.InitPipelines(p.cfg.Pipelines)
- if err != nil {
- return errors.E(op, err)
+ for i := range p.cfg.Pipelines {
+ p.pipelines.Store(i, p.cfg.Pipelines[i])
}
// initialize priority queue
@@ -81,28 +82,42 @@ func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
- for name := range p.jobConstructors {
- jb, err := p.jobConstructors[name].JobsConstruct("", p.queue)
- if err != nil {
- errCh <- err
- return errCh
- }
-
- p.consumers[name] = jb
- }
-
// register initial pipelines
- for i := 0; i < len(p.pipelines); i++ {
- pipe := p.pipelines[i]
+ p.pipelines.Range(func(key, value interface{}) bool {
+ // pipeline name (ie test-local, sqs-aws, etc)
+ name := key.(string)
+
+ // pipeline associated with the name
+ pipe := value.(*pipeline.Pipeline)
+ // driver for the pipeline (ie amqp, ephemeral, etc)
+ dr := pipe.Driver()
+
+ // jobConstructors contains constructors for the drivers
+ // we need here to initialize these drivers for the pipelines
+ if c, ok := p.jobConstructors[dr]; ok {
+ // config key for the particular sub-driver jobs.pipelines.test-local
+ configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
+
+ // init the driver
+ initializedDriver, err := c.JobsConstruct(configKey, p.queue)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return false
+ }
- if jb, ok := p.consumers[pipe.Driver()]; ok {
- err := jb.Register(pipe.Name())
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers[name] = initializedDriver
+
+ // register pipeline for the initialized driver
+ err = initializedDriver.Register(pipe)
if err != nil {
errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
- return errCh
+ return false
}
}
- }
+
+ return true
+ })
var err error
p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"})
@@ -119,12 +134,18 @@ func (p *Plugin) Serve() chan error {
// get data JOB from the queue
job := p.queue.GetMax()
+ ctx, err := job.Context()
+ if err != nil {
+ job.Nack()
+ p.log.Error("job marshal context", "error", err)
+ }
+
exec := payload.Payload{
- Context: job.Context(),
+ Context: ctx,
Body: job.Body(),
}
- _, err := p.workersPool.Exec(exec)
+ _, err = p.workersPool.Exec(exec)
if err != nil {
job.Nack()
p.log.Error("job execute", "error", err)
@@ -160,41 +181,105 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Push(j *structs.Job) (*string, error) {
+func (p *Plugin) Push(j *structs.Job) error {
const op = errors.Op("jobs_plugin_push")
- pipe := p.pipelines.Get(j.Options.Pipeline)
- broker, ok := p.consumers[pipe.Driver()]
+ // get the pipeline for the job
+ pipe, ok := p.pipelines.Load(j.Options.Pipeline)
if !ok {
- return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver()))
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline))
}
- id, err := broker.Push(j)
+ // type conversion
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ // if job has no priority, inherit it from the pipeline
+ if j.Options.Priority == 0 {
+ j.Options.Priority = ppl.Priority()
+ }
+
+ err := d.Push(j)
if err != nil {
- panic(err)
+ return errors.E(op, err)
}
- return id, nil
+ return nil
}
-func (p *Plugin) PushBatch(j []*structs.Job) (*string, error) {
+func (p *Plugin) PushBatch(j []*structs.Job) error {
const op = errors.Op("jobs_plugin_push")
for i := 0; i < len(j); i++ {
- pipe := p.pipelines.Get(j[i].Options.Pipeline)
+ // get the pipeline for the job
+ pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline))
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
- broker, ok := p.consumers[pipe.Driver()]
+ d, ok := p.consumers[ppl.Name()]
if !ok {
- return nil, errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", pipe.Driver()))
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ // if job has no priority, inherit it from the pipeline
+ if j[i].Options.Priority == 0 {
+ j[i].Options.Priority = ppl.Priority()
}
- _, err := broker.Push(j[i])
+ err := d.Push(j[i])
if err != nil {
- return nil, errors.E(op, err)
+ return errors.E(op, err)
}
}
- return utils.AsStringPtr("test"), nil
+ return nil
+}
+
+func (p *Plugin) Pause(pipelines []string) {
+ for i := 0; i < len(pipelines); i++ {
+ pipe, ok := p.pipelines.Load(pipelines[i])
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pipelines[i])
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
+ return
+ }
+
+ // redirect call to the underlying driver
+ d.Pause(ppl.Name())
+ }
+}
+
+func (p *Plugin) Resume(pipelines []string) {
+ for i := 0; i < len(pipelines); i++ {
+ pipe, ok := p.pipelines.Load(pipelines[i])
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pipelines[i])
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
+ return
+ }
+
+ // redirect call to the underlying driver
+ d.Resume(ppl.Name())
+ }
}
func (p *Plugin) RPC() interface{} {