diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 573 |
1 files changed, 573 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go new file mode 100644 index 00000000..26015516 --- /dev/null +++ b/plugins/jobs/plugin.go @@ -0,0 +1,573 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/pool" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" +) + +const ( + // RrMode env variable + RrMode string = "RR_MODE" + RrModeJobs string = "jobs" + + PluginName string = "jobs" + pipelines string = "pipelines" +) + +type Plugin struct { + sync.RWMutex + + // Jobs plugin configuration + cfg *Config `structure:"jobs"` + log logger.Logger + workersPool pool.Pool + server server.Server + + jobConstructors map[string]jobs.Constructor + consumers map[string]jobs.Consumer + + // events handler + events events.Handler + + // priority queue implementation + queue priorityqueue.Queue + + // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline + pipelines sync.Map + + // initial set of the pipelines to consume + consume map[string]struct{} + + // signal channel to stop the pollers + stopCh chan struct{} + + // internal payloads pool + pldPool sync.Pool +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { + const op = errors.Op("jobs_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + + p.cfg.InitDefaults() + + p.server = server + + p.events = events.NewEventsHandler() + p.events.AddListener(p.collectJobsEvents) + + p.jobConstructors = make(map[string]jobs.Constructor) + p.consumers = make(map[string]jobs.Consumer) + p.consume = make(map[string]struct{}) + p.stopCh = make(chan struct{}, 1) + + p.pldPool = sync.Pool{New: func() interface{} { + // with nil fields + return &payload.Payload{} + }} + + // initial set of pipelines + for i := range p.cfg.Pipelines { + p.pipelines.Store(i, p.cfg.Pipelines[i]) + } + + if len(p.cfg.Consume) > 0 { + for i := 0; i < len(p.cfg.Consume); i++ { + p.consume[p.cfg.Consume[i]] = struct{}{} + } + } + + // initialize priority queue + p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) + p.log = log + + return nil +} + +func (p *Plugin) Serve() chan error { //nolint:gocognit + errCh := make(chan error, 1) + const op = errors.Op("jobs_plugin_serve") + + // register initial pipelines + p.pipelines.Range(func(key, value interface{}) bool { + t := time.Now() + // pipeline name (ie test-local, sqs-aws, etc) + name := key.(string) + + // pipeline associated with the name + pipe := value.(*pipeline.Pipeline) + // driver for the pipeline (ie amqp, ephemeral, etc) + dr := pipe.Driver() + + // jobConstructors contains constructors for the drivers + // we need here to initialize these drivers for the pipelines + if c, ok := p.jobConstructors[dr]; ok { + // config key for the particular sub-driver jobs.pipelines.test-local + configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) + + // init the driver + initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue) + if err != nil { + errCh <- errors.E(op, err) + return false + } + + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers[name] = initializedDriver + + // register pipeline for the initialized driver + err = initializedDriver.Register(context.Background(), pipe) + if err != nil { + errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) + return false + } + + // if pipeline initialized to be consumed, call Run on it + if _, ok := p.consume[name]; ok { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + err = initializedDriver.Run(ctx, pipe) + if err != nil { + errCh <- errors.E(op, err) + return false + } + return true + } + + return true + } + + p.events.Push(events.JobEvent{ + Event: events.EventDriverReady, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: t.Sub(t), + }) + + return true + }) + + var err error + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"}) + if err != nil { + errCh <- err + return errCh + } + + // start listening + go func() { + for i := uint8(0); i < p.cfg.NumPollers; i++ { + go func() { + for { + select { + case <-p.stopCh: + p.log.Info("------> job poller stopped <------") + return + default: + // get prioritized JOB from the queue + jb := p.queue.ExtractMin() + + // parse the context + // for each job, context contains: + /* + 1. Job class + 2. Job ID provided from the outside + 3. Job Headers map[string][]string + 4. Timeout in seconds + 5. Pipeline name + */ + + ctx, err := jb.Context() + if err != nil { + errNack := jb.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + p.log.Error("job marshal context", "error", err) + continue + } + + // get payload from the sync.Pool + exec := p.getPayload(jb.Body(), ctx) + + // protect from the pool reset + p.RLock() + resp, err := p.workersPool.Exec(exec) + p.RUnlock() + if err != nil { + // RR protocol level error, Nack the job + errNack := jb.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + + p.log.Error("job execute failed", "error", err) + + p.putPayload(exec) + continue + } + + // if response is nil or body is nil, just acknowledge the job + if resp == nil || resp.Body == nil { + p.putPayload(exec) + err = jb.Ack() + if err != nil { + p.log.Error("acknowledge error, job might be missed", "error", err) + continue + } + } + + // handle the response protocol + err = handleResponse(resp.Body, jb, p.log) + if err != nil { + p.putPayload(exec) + errNack := jb.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed, job might be lost", "root error", err, "error nack", errNack) + continue + } + + p.log.Error("job negatively acknowledged", "error", err) + continue + } + + // return payload + p.putPayload(exec) + } + } + }() + } + }() + + return errCh +} + +func (p *Plugin) Stop() error { + for k, v := range p.consumers { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + err := v.Stop(ctx) + if err != nil { + cancel() + p.log.Error("stop job driver", "driver", k) + continue + } + cancel() + } + + // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, + // but if not, this is not a problem at all. + // The main target is to stop the drivers + go func() { + for i := uint8(0); i < p.cfg.NumPollers; i++ { + // stop jobs plugin pollers + p.stopCh <- struct{}{} + } + }() + + // just wait pollers for 5 seconds before exit + time.Sleep(time.Second * 5) + + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectMQBrokers, + } +} + +func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { + p.jobConstructors[name.Name()] = c +} + +func (p *Plugin) Available() {} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Reset() error { + p.Lock() + defer p.Unlock() + + const op = errors.Op("jobs_plugin_reset") + p.log.Info("JOBS plugin received restart request. Restarting...") + p.workersPool.Destroy(context.Background()) + p.workersPool = nil + + var err error + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) + if err != nil { + return errors.E(op, err) + } + + p.log.Info("JOBS workers pool successfully restarted") + + return nil +} + +func (p *Plugin) Push(j *job.Job) error { + const op = errors.Op("jobs_plugin_push") + + // get the pipeline for the job + pipe, ok := p.pipelines.Load(j.Options.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline)) + } + + // type conversion + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // if job has no priority, inherit it from the pipeline + // TODO(rustatian) merge all options, not only priority + if j.Options.Priority == 0 { + j.Options.Priority = ppl.Priority() + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + + err := d.Push(ctx, j) + if err != nil { + cancel() + return errors.E(op, err) + } + + cancel() + + return nil +} + +func (p *Plugin) PushBatch(j []*job.Job) error { + const op = errors.Op("jobs_plugin_push") + + for i := 0; i < len(j); i++ { + // get the pipeline for the job + pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline)) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // if job has no priority, inherit it from the pipeline + if j[i].Options.Priority == 0 { + j[i].Options.Priority = ppl.Priority() + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + err := d.Push(ctx, j[i]) + if err != nil { + cancel() + return errors.E(op, err) + } + + cancel() + } + + return nil +} + +func (p *Plugin) Pause(pp string) { + pipe, ok := p.pipelines.Load(pp) + + if !ok { + p.log.Error("no such pipeline", "requested", pp) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pp) + return + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + // redirect call to the underlying driver + d.Pause(ctx, ppl.Name()) +} + +func (p *Plugin) Resume(pp string) { + pipe, ok := p.pipelines.Load(pp) + if !ok { + p.log.Error("no such pipeline", "requested", pp) + } + + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + p.log.Warn("driver for the pipeline not found", "pipeline", pp) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + // redirect call to the underlying driver + d.Resume(ctx, ppl.Name()) +} + +// Declare a pipeline. +func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { + const op = errors.Op("jobs_plugin_declare") + // driver for the pipeline (ie amqp, ephemeral, etc) + dr := pipeline.Driver() + if dr == "" { + return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name())) + } + + // jobConstructors contains constructors for the drivers + // we need here to initialize these drivers for the pipelines + if c, ok := p.jobConstructors[dr]; ok { + // init the driver from pipeline + initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue) + if err != nil { + return errors.E(op, err) + } + + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers[pipeline.Name()] = initializedDriver + + // register pipeline for the initialized driver + err = initializedDriver.Register(context.Background(), pipeline) + if err != nil { + return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name())) + } + + // if pipeline initialized to be consumed, call Run on it + // but likely for the dynamic pipelines it should be started manually + if _, ok := p.consume[pipeline.Name()]; ok { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + err = initializedDriver.Run(ctx, pipeline) + if err != nil { + return errors.E(op, err) + } + } + } + + // save the pipeline + p.pipelines.Store(pipeline.Name(), pipeline) + + return nil +} + +// Destroy pipeline and release all associated resources. +func (p *Plugin) Destroy(pp string) error { + const op = errors.Op("jobs_plugin_destroy") + pipe, ok := p.pipelines.Load(pp) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp)) + } + + // type conversion + ppl := pipe.(*pipeline.Pipeline) + + d, ok := p.consumers[ppl.Name()] + if !ok { + return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) + } + + // delete consumer + delete(p.consumers, ppl.Name()) + p.pipelines.Delete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) + defer cancel() + + return d.Stop(ctx) +} + +func (p *Plugin) List() []string { + out := make([]string, 0, 10) + + p.pipelines.Range(func(key, _ interface{}) bool { + // we can safely convert value here as we know that we store keys as strings + out = append(out, key.(string)) + return true + }) + + return out +} + +func (p *Plugin) RPC() interface{} { + return &rpc{ + log: p.log, + p: p, + } +} + +func (p *Plugin) collectJobsEvents(event interface{}) { + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { + case events.EventPipePaused: + p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventJobStart: + p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventJobOK: + p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPushOK: + p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPushError: + p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventJobError: + p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeActive: + p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeStopped: + p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventPipeError: + p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventDriverReady: + p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + case events.EventInitialized: + p.log.Info("driver initialized", "driver", jev.Driver, "start", jev.Start.UTC()) + } + } +} + +func (p *Plugin) getPayload(body, context []byte) *payload.Payload { + pld := p.pldPool.Get().(*payload.Payload) + pld.Body = body + pld.Context = context + return pld +} + +func (p *Plugin) putPayload(pld *payload.Payload) { + pld.Body = nil + pld.Context = nil + p.pldPool.Put(pld) +} |