diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 719 |
1 files changed, 0 insertions, 719 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go deleted file mode 100644 index 3aec6acc..00000000 --- a/plugins/jobs/plugin.go +++ /dev/null @@ -1,719 +0,0 @@ -package jobs - -import ( - "context" - "fmt" - "sync" - "time" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/pool" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/pkg/state/process" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/job" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" -) - -const ( - // RrMode env variable - RrMode string = "RR_MODE" - RrModeJobs string = "jobs" - - PluginName string = "jobs" - pipelines string = "pipelines" -) - -type Plugin struct { - sync.RWMutex - - // Jobs plugin configuration - cfg *Config `structure:"jobs"` - log logger.Logger - workersPool pool.Pool - server server.Server - - jobConstructors map[string]jobs.Constructor - consumers sync.Map // map[string]jobs.Consumer - - // events handler - events events.Handler - - // priority queue implementation - queue priorityqueue.Queue - - // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline - pipelines sync.Map - - // initial set of the pipelines to consume - consume map[string]struct{} - - // signal channel to stop the pollers - stopCh chan struct{} - - // internal payloads pool - pldPool sync.Pool - statsExporter *statsExporter -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { - const op = errors.Op("jobs_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, err) - } - - p.cfg.InitDefaults() - - p.server = server - - p.events = events.NewEventsHandler() - p.events.AddListener(p.collectJobsEvents) - - p.jobConstructors = make(map[string]jobs.Constructor) - p.consume = make(map[string]struct{}) - p.stopCh = make(chan struct{}, 1) - - p.pldPool = sync.Pool{New: func() interface{} { - // with nil fields - return &payload.Payload{} - }} - - // initial set of pipelines - for i := range p.cfg.Pipelines { - p.pipelines.Store(i, p.cfg.Pipelines[i]) - } - - if len(p.cfg.Consume) > 0 { - for i := 0; i < len(p.cfg.Consume); i++ { - p.consume[p.cfg.Consume[i]] = struct{}{} - } - } - - // initialize priority queue - p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) - p.log = log - - // metrics - p.statsExporter = newStatsExporter(p) - p.events.AddListener(p.statsExporter.metricsCallback) - - return nil -} - -func (p *Plugin) Serve() chan error { //nolint:gocognit - errCh := make(chan error, 1) - const op = errors.Op("jobs_plugin_serve") - - // register initial pipelines - p.pipelines.Range(func(key, value interface{}) bool { - t := time.Now() - // pipeline name (ie test-local, sqs-aws, etc) - name := key.(string) - - // pipeline associated with the name - pipe := value.(*pipeline.Pipeline) - // driver for the pipeline (ie amqp, ephemeral, etc) - dr := pipe.Driver() - - // jobConstructors contains constructors for the drivers - // we need here to initialize these drivers for the pipelines - if _, ok := p.jobConstructors[dr]; ok { - // config key for the particular sub-driver jobs.pipelines.test-local - configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) - - // init the driver - initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue) - if err != nil { - errCh <- errors.E(op, err) - return false - } - - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers.Store(name, initializedDriver) - - // register pipeline for the initialized driver - err = initializedDriver.Register(context.Background(), pipe) - if err != nil { - errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name())) - return false - } - - // if pipeline initialized to be consumed, call Run on it - if _, ok := p.consume[name]; ok { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - err = initializedDriver.Run(ctx, pipe) - if err != nil { - errCh <- errors.E(op, err) - return false - } - return true - } - - return true - } - - p.events.Push(events.JobEvent{ - Event: events.EventDriverReady, - Pipeline: pipe.Name(), - Driver: pipe.Driver(), - Start: t, - Elapsed: t.Sub(t), - }) - - return true - }) - - // do not continue processing, immediately stop if channel contains an error - if len(errCh) > 0 { - return errCh - } - - var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}) - if err != nil { - errCh <- err - return errCh - } - - // start listening - go func() { - for i := uint8(0); i < p.cfg.NumPollers; i++ { - go func() { - for { - select { - case <-p.stopCh: - p.log.Info("------> job poller stopped <------") - return - default: - // get prioritized JOB from the queue - jb := p.queue.ExtractMin() - - // parse the context - // for each job, context contains: - /* - 1. Job class - 2. Job ID provided from the outside - 3. Job Headers map[string][]string - 4. Timeout in seconds - 5. Pipeline name - */ - - start := time.Now() - p.events.Push(events.JobEvent{ - Event: events.EventJobStart, - ID: jb.ID(), - Start: start, - Elapsed: 0, - }) - - ctx, err := jb.Context() - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventJobError, - Error: err, - ID: jb.ID(), - Start: start, - Elapsed: time.Since(start), - }) - - errNack := jb.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed", "error", errNack) - } - p.log.Error("job marshal context", "error", err) - continue - } - - // get payload from the sync.Pool - exec := p.getPayload(jb.Body(), ctx) - - // protect from the pool reset - p.RLock() - resp, err := p.workersPool.Exec(exec) - p.RUnlock() - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventJobError, - ID: jb.ID(), - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - // RR protocol level error, Nack the job - errNack := jb.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed", "error", errNack) - } - - p.log.Error("job execute failed", "error", err) - - p.putPayload(exec) - continue - } - - // if response is nil or body is nil, just acknowledge the job - if resp == nil || resp.Body == nil { - p.putPayload(exec) - err = jb.Ack() - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventJobError, - ID: jb.ID(), - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - p.log.Error("acknowledge error, job might be missed", "error", err) - continue - } - - p.events.Push(events.JobEvent{ - Event: events.EventJobOK, - ID: jb.ID(), - Start: start, - Elapsed: time.Since(start), - }) - - continue - } - - // handle the response protocol - err = handleResponse(resp.Body, jb, p.log) - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventJobError, - ID: jb.ID(), - Start: start, - Error: err, - Elapsed: time.Since(start), - }) - p.putPayload(exec) - errNack := jb.Nack() - if errNack != nil { - p.log.Error("negatively acknowledge failed, job might be lost", "root error", err, "error nack", errNack) - continue - } - - p.log.Error("job negatively acknowledged", "error", err) - continue - } - - p.events.Push(events.JobEvent{ - Event: events.EventJobOK, - ID: jb.ID(), - Start: start, - Elapsed: time.Since(start), - }) - - // return payload - p.putPayload(exec) - } - } - }() - } - }() - - return errCh -} - -func (p *Plugin) Stop() error { - // range over all consumers and call stop - p.consumers.Range(func(key, value interface{}) bool { - consumer := value.(jobs.Consumer) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := consumer.Stop(ctx) - if err != nil { - cancel() - p.log.Error("stop job driver", "driver", key) - return true - } - cancel() - return true - }) - - // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, - // but if not, this is not a problem at all. - // The main target is to stop the drivers - go func() { - for i := uint8(0); i < p.cfg.NumPollers; i++ { - // stop jobs plugin pollers - p.stopCh <- struct{}{} - } - }() - - // just wait pollers for 5 seconds before exit - time.Sleep(time.Second * 5) - - p.Lock() - p.workersPool.Destroy(context.Background()) - p.Unlock() - - return nil -} - -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.CollectMQBrokers, - } -} - -func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { - p.jobConstructors[name.Name()] = c -} - -func (p *Plugin) Workers() []*process.State { - p.RLock() - wrk := p.workersPool.Workers() - p.RUnlock() - - ps := make([]*process.State, len(wrk)) - - for i := 0; i < len(wrk); i++ { - st, err := process.WorkerProcessState(wrk[i]) - if err != nil { - p.log.Error("jobs workers state", "error", err) - return nil - } - - ps[i] = st - } - - return ps -} - -func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { - const op = errors.Op("jobs_plugin_drivers_state") - jst := make([]*jobState.State, 0, 2) - var err error - p.consumers.Range(func(key, value interface{}) bool { - consumer := value.(jobs.Consumer) - newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) - - var state *jobState.State - state, err = consumer.State(newCtx) - if err != nil { - cancel() - return false - } - - jst = append(jst, state) - cancel() - return true - }) - - if err != nil { - return nil, errors.E(op, err) - } - return jst, nil -} - -func (p *Plugin) Available() {} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Reset() error { - p.Lock() - defer p.Unlock() - - const op = errors.Op("jobs_plugin_reset") - p.log.Info("JOBS plugin received restart request. Restarting...") - p.workersPool.Destroy(context.Background()) - p.workersPool = nil - - var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback) - if err != nil { - return errors.E(op, err) - } - - p.log.Info("JOBS workers pool successfully restarted") - - return nil -} - -func (p *Plugin) Push(j *job.Job) error { - const op = errors.Op("jobs_plugin_push") - - start := time.Now() - // get the pipeline for the job - pipe, ok := p.pipelines.Load(j.Options.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline)) - } - - // type conversion - ppl := pipe.(*pipeline.Pipeline) - - d, ok := p.consumers.Load(ppl.Name()) - if !ok { - return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) - } - - // if job has no priority, inherit it from the pipeline - if j.Options.Priority == 0 { - j.Options.Priority = ppl.Priority() - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - - err := d.(jobs.Consumer).Push(ctx, j) - if err != nil { - p.events.Push(events.JobEvent{ - Event: events.EventPushError, - ID: j.Ident, - Pipeline: ppl.Name(), - Driver: ppl.Driver(), - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - return errors.E(op, err) - } - - p.events.Push(events.JobEvent{ - Event: events.EventPushOK, - ID: j.Ident, - Pipeline: ppl.Name(), - Driver: ppl.Driver(), - Error: err, - Start: start, - Elapsed: time.Since(start), - }) - - return nil -} - -func (p *Plugin) PushBatch(j []*job.Job) error { - const op = errors.Op("jobs_plugin_push") - start := time.Now() - - for i := 0; i < len(j); i++ { - // get the pipeline for the job - pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline)) - } - - ppl := pipe.(*pipeline.Pipeline) - - d, ok := p.consumers.Load(ppl.Name()) - if !ok { - return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) - } - - // if job has no priority, inherit it from the pipeline - if j[i].Options.Priority == 0 { - j[i].Options.Priority = ppl.Priority() - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := d.(jobs.Consumer).Push(ctx, j[i]) - if err != nil { - cancel() - p.events.Push(events.JobEvent{ - Event: events.EventPushError, - ID: j[i].Ident, - Pipeline: ppl.Name(), - Driver: ppl.Driver(), - Start: start, - Elapsed: time.Since(start), - Error: err, - }) - return errors.E(op, err) - } - - cancel() - } - - return nil -} - -func (p *Plugin) Pause(pp string) { - pipe, ok := p.pipelines.Load(pp) - - if !ok { - p.log.Error("no such pipeline", "requested", pp) - } - - ppl := pipe.(*pipeline.Pipeline) - - d, ok := p.consumers.Load(ppl.Name()) - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pp) - return - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - // redirect call to the underlying driver - d.(jobs.Consumer).Pause(ctx, ppl.Name()) -} - -func (p *Plugin) Resume(pp string) { - pipe, ok := p.pipelines.Load(pp) - if !ok { - p.log.Error("no such pipeline", "requested", pp) - } - - ppl := pipe.(*pipeline.Pipeline) - - d, ok := p.consumers.Load(ppl.Name()) - if !ok { - p.log.Warn("driver for the pipeline not found", "pipeline", pp) - return - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - // redirect call to the underlying driver - d.(jobs.Consumer).Resume(ctx, ppl.Name()) -} - -// Declare a pipeline. -func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { - const op = errors.Op("jobs_plugin_declare") - // driver for the pipeline (ie amqp, ephemeral, etc) - dr := pipeline.Driver() - if dr == "" { - return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name())) - } - - // jobConstructors contains constructors for the drivers - // we need here to initialize these drivers for the pipelines - if _, ok := p.jobConstructors[dr]; ok { - // init the driver from pipeline - initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue) - if err != nil { - return errors.E(op, err) - } - - // register pipeline for the initialized driver - err = initializedDriver.Register(context.Background(), pipeline) - if err != nil { - return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name())) - } - - // if pipeline initialized to be consumed, call Run on it - // but likely for the dynamic pipelines it should be started manually - if _, ok := p.consume[pipeline.Name()]; ok { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() - err = initializedDriver.Run(ctx, pipeline) - if err != nil { - return errors.E(op, err) - } - } - - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers.Store(pipeline.Name(), initializedDriver) - // save the pipeline - p.pipelines.Store(pipeline.Name(), pipeline) - } - - return nil -} - -// Destroy pipeline and release all associated resources. -func (p *Plugin) Destroy(pp string) error { - const op = errors.Op("jobs_plugin_destroy") - pipe, ok := p.pipelines.Load(pp) - if !ok { - return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp)) - } - - // type conversion - ppl := pipe.(*pipeline.Pipeline) - - // delete consumer - d, ok := p.consumers.LoadAndDelete(ppl.Name()) - if !ok { - return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) - } - - // delete old pipeline - p.pipelines.LoadAndDelete(pp) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := d.(jobs.Consumer).Stop(ctx) - if err != nil { - cancel() - return errors.E(op, err) - } - - cancel() - return nil -} - -func (p *Plugin) List() []string { - out := make([]string, 0, 10) - - p.pipelines.Range(func(key, _ interface{}) bool { - // we can safely convert value here as we know that we store keys as strings - out = append(out, key.(string)) - return true - }) - - return out -} - -func (p *Plugin) RPC() interface{} { - return &rpc{ - log: p.log, - p: p, - } -} - -func (p *Plugin) collectJobsEvents(event interface{}) { - if jev, ok := event.(events.JobEvent); ok { - switch jev.Event { - case events.EventPipePaused: - p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventJobStart: - p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventJobOK: - p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPushOK: - p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPushError: - p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventJobError: - p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPipeActive: - p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPipeStopped: - p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventPipeError: - p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - case events.EventDriverReady: - p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) - } - } -} - -func (p *Plugin) getPayload(body, context []byte) *payload.Payload { - pld := p.pldPool.Get().(*payload.Payload) - pld.Body = body - pld.Context = context - return pld -} - -func (p *Plugin) putPayload(pld *payload.Payload) { - pld.Body = nil - pld.Context = nil - p.pldPool.Put(pld) -} |