summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-18 16:13:49 +0300
committerValery Piashchynski <[email protected]>2021-08-18 16:13:49 +0300
commitc35fbff05205330ab8e49f6008fdbd59128cee14 (patch)
treed3eb03e8db7231d97ae4ff1d60a0c5a50db8a6fb /plugins
parent1d092e57afb55a01283b41942ca3ef15a7e4bdef (diff)
Add prometheus metrics for the jobs, update tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go7
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go5
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go5
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go5
-rw-r--r--plugins/jobs/metrics.go91
-rw-r--r--plugins/jobs/plugin.go94
-rw-r--r--plugins/jobs/rpc.go1
7 files changed, 199 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index f3b40ae3..95df02ec 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -307,7 +307,8 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
Driver: pipe.Driver(),
Queue: q.Name,
Active: int64(q.Messages),
- Delayed: *j.delayed,
+ Delayed: atomic.LoadInt64(j.delayed),
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
}, nil
case <-ctx.Done():
@@ -501,3 +502,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
return errors.E(op, errors.TimeOut, ctx.Err())
}
}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 6a178dff..6323148b 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -235,6 +235,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: j.tName,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
}
// set stat, skip errors (replace with 0)
@@ -353,3 +354,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 34778642..f0992cd6 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -113,6 +113,7 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
Queue: pipe.Name(),
Active: atomic.LoadInt64(j.active),
Delayed: atomic.LoadInt64(j.delayed),
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
}, nil
}
@@ -267,3 +268,7 @@ func (j *JobConsumer) consume() {
}
}()
}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index d957b3eb..5d419b51 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -285,6 +285,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: *j.queueURL,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
}
nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
@@ -418,3 +419,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
return nil
}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go
new file mode 100644
index 00000000..61856a10
--- /dev/null
+++ b/plugins/jobs/metrics.go
@@ -0,0 +1,91 @@
+package jobs
+
+import (
+ "sync/atomic"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/plugins/informer"
+)
+
+func (p *Plugin) MetricsCollector() []prometheus.Collector {
+ // p - implements Exporter interface (workers)
+ // other - request duration and count
+ return []prometheus.Collector{p.statsExporter}
+}
+
+const (
+ namespace = "rr_jobs"
+)
+
+type statsExporter struct {
+ workers informer.Informer
+ workersMemory uint64
+ jobsOk uint64
+ pushOk uint64
+ jobsErr uint64
+ pushErr uint64
+}
+
+var (
+ worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil)
+ pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil)
+ pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil)
+ jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil)
+ jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil)
+)
+
+func newStatsExporter(stats informer.Informer) *statsExporter {
+ return &statsExporter{
+ workers: stats,
+ workersMemory: 0,
+ jobsOk: 0,
+ pushOk: 0,
+ jobsErr: 0,
+ pushErr: 0,
+ }
+}
+
+func (se *statsExporter) metricsCallback(event interface{}) {
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event { //nolint:exhaustive
+ case events.EventJobOK:
+ atomic.AddUint64(&se.jobsOk, 1)
+ case events.EventPushOK:
+ atomic.AddUint64(&se.pushOk, 1)
+ case events.EventPushError:
+ atomic.AddUint64(&se.pushErr, 1)
+ case events.EventJobError:
+ atomic.AddUint64(&se.jobsErr, 1)
+ }
+ }
+}
+
+func (se *statsExporter) Describe(d chan<- *prometheus.Desc) {
+ // send description
+ d <- pushErr
+ d <- pushOk
+ d <- jobsErr
+ d <- jobsOk
+}
+
+func (se *statsExporter) Collect(ch chan<- prometheus.Metric) {
+ // get the copy of the processes
+ workers := se.workers.Workers()
+
+ // cumulative RSS memory in bytes
+ var cum uint64
+
+ // collect the memory
+ for i := 0; i < len(workers); i++ {
+ cum += workers[i].MemoryUsage
+ }
+
+ // send the values to the prometheus
+ ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum))
+ // send the values to the prometheus
+ ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk)))
+ ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr)))
+ ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk)))
+ ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr)))
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 5f6c8b94..5e62c5c5 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -59,7 +59,8 @@ type Plugin struct {
stopCh chan struct{}
// internal payloads pool
- pldPool sync.Pool
+ pldPool sync.Pool
+ statsExporter *statsExporter
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -105,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize)
p.log = log
+ // metrics
+ p.statsExporter = newStatsExporter(p)
+ p.events.AddListener(p.statsExporter.metricsCallback)
+
return nil
}
@@ -202,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
5. Pipeline name
*/
+ start := time.Now()
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobStart,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: 0,
+ })
+
ctx, err := jb.Context()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+
errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
@@ -220,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
resp, err := p.workersPool.Exec(exec)
p.RUnlock()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
// RR protocol level error, Nack the job
errNack := jb.Nack()
if errNack != nil {
@@ -237,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.putPayload(exec)
err = jb.Ack()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
p.log.Error("acknowledge error, job might be missed", "error", err)
continue
}
+
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobOK,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
}
// handle the response protocol
err = handleResponse(resp.Body, jb, p.log)
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
p.putPayload(exec)
errNack := jb.Nack()
if errNack != nil {
@@ -256,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobOK,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
// return payload
p.putPayload(exec)
}
@@ -359,7 +410,7 @@ func (p *Plugin) Reset() error {
p.workersPool = nil
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback)
if err != nil {
return errors.E(op, err)
}
@@ -372,6 +423,7 @@ func (p *Plugin) Reset() error {
func (p *Plugin) Push(j *job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j.Options.Pipeline)
if !ok {
@@ -397,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error {
err := d.Push(ctx, j)
if err != nil {
- cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushError,
+ ID: j.Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return errors.E(op, err)
}
- cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushOK,
+ ID: j.Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return nil
}
@@ -410,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
for i := 0; i < len(j); i++ {
+ start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -432,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
err := d.Push(ctx, j[i])
if err != nil {
cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushError,
+ ID: j[i].Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return errors.E(op, err)
}
@@ -576,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) {
case events.EventPipePaused:
p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobStart:
- p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobOK:
- p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPushOK:
p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPushError:
- p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobError:
- p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPipeActive:
p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPipeStopped:
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 2750cd8f..94f903d5 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -129,6 +129,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
Active: state[i].Active,
Delayed: state[i].Delayed,
Reserved: state[i].Reserved,
+ Ready: state[i].Ready,
})
}