diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 134 |
1 files changed, 126 insertions, 8 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 7707cb8a..5e62c5c5 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -13,6 +13,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pool" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" @@ -57,7 +59,8 @@ type Plugin struct { stopCh chan struct{} // internal payloads pool - pldPool sync.Pool + pldPool sync.Pool + statsExporter *statsExporter } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -103,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) p.log = log + // metrics + p.statsExporter = newStatsExporter(p) + p.events.AddListener(p.statsExporter.metricsCallback) + return nil } @@ -200,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit 5. Pipeline name */ + start := time.Now() + p.events.Push(events.JobEvent{ + Event: events.EventJobStart, + ID: jb.ID(), + Start: start, + Elapsed: 0, + }) + ctx, err := jb.Context() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) @@ -218,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit resp, err := p.workersPool.Exec(exec) p.RUnlock() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // RR protocol level error, Nack the job errNack := jb.Nack() if errNack != nil { @@ -235,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.putPayload(exec) err = jb.Ack() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.log.Error("acknowledge error, job might be missed", "error", err) continue } + + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) } // handle the response protocol err = handleResponse(resp.Body, jb, p.log) if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.putPayload(exec) errNack := jb.Nack() if errNack != nil { @@ -254,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // return payload p.putPayload(exec) } @@ -303,6 +356,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) { p.jobConstructors[name.Name()] = c } +func (p *Plugin) Workers() []*process.State { + p.RLock() + wrk := p.workersPool.Workers() + p.RUnlock() + + ps := make([]*process.State, len(wrk)) + + for i := 0; i < len(wrk); i++ { + st, err := process.WorkerProcessState(wrk[i]) + if err != nil { + p.log.Error("jobs workers state", "error", err) + return nil + } + + ps[i] = st + } + + return ps +} + +func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { + const op = errors.Op("jobs_plugin_drivers_state") + jst := make([]*jobState.State, 0, len(p.consumers)) + for k := range p.consumers { + d := p.consumers[k] + newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) + state, err := d.State(newCtx) + if err != nil { + cancel() + return nil, errors.E(op, err) + } + + jst = append(jst, state) + cancel() + } + return jst, nil +} + func (p *Plugin) Available() {} func (p *Plugin) Name() string { @@ -319,7 +410,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback) if err != nil { return errors.E(op, err) } @@ -332,6 +423,7 @@ func (p *Plugin) Reset() error { func (p *Plugin) Push(j *job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { @@ -357,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error { err := d.Push(ctx, j) if err != nil { - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushOK, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return nil } @@ -370,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -392,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error { err := d.Push(ctx, j[i]) if err != nil { cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j[i].Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } @@ -536,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) { case events.EventPipePaused: p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: - p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: - p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushOK: p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushError: - p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobError: - p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeActive: p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeStopped: |