diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 16:13:49 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-18 16:13:49 +0300 |
commit | c35fbff05205330ab8e49f6008fdbd59128cee14 (patch) | |
tree | d3eb03e8db7231d97ae4ff1d60a0c5a50db8a6fb | |
parent | 1d092e57afb55a01283b41942ca3ef15a7e4bdef (diff) |
Add prometheus metrics for the jobs, update tests
Signed-off-by: Valery Piashchynski <[email protected]>
30 files changed, 450 insertions, 27 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go index f8426b12..dab9346c 100644 --- a/pkg/bst/bst.go +++ b/pkg/bst/bst.go @@ -88,7 +88,7 @@ func (b *BST) Remove(uuid string, topic string) { b.removeHelper(uuid, topic, nil) } -func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit +func (b *BST) removeHelper(uuid string, topic string, parent *BST) { curr := b for curr != nil { if topic < curr.topic { //nolint:gocritic diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go index 300f6748..f65ede67 100644 --- a/pkg/events/jobs_events.go +++ b/pkg/events/jobs_events.go @@ -6,7 +6,7 @@ import ( const ( // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK = iota + 12000 + EventPushOK J = iota + 12000 // EventPushError caused when job can not be registered. EventPushError @@ -58,6 +58,8 @@ func (ev J) String() string { return "EventPipeError" case EventDriverReady: return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" } return UnknownEventType } @@ -67,16 +69,12 @@ type JobEvent struct { Event J // String is job id. ID string - // Pipeline name Pipeline string - // Associated driver name (amqp, ephemeral, etc) Driver string - // Error for the jobs/pipes errors Error error - // event timings Start time.Time Elapsed time.Duration diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go index d90118c3..56050084 100644 --- a/pkg/state/job/state.go +++ b/pkg/state/job/state.go @@ -14,4 +14,6 @@ type State struct { Delayed int64 // Reserved jobs which are in the driver but not consumed yet Reserved int64 + // Status - 1 Ready, 0 - Paused + Ready bool } diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go index 5fa8e64e..c3352a52 100644 --- a/pkg/worker_handler/errors.go +++ b/pkg/worker_handler/errors.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package handler diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go index 390cc7d1..3c6c2186 100644 --- a/pkg/worker_handler/errors_windows.go +++ b/pkg/worker_handler/errors_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package handler diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index f3b40ae3..95df02ec 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -307,7 +307,8 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { Driver: pipe.Driver(), Queue: q.Name, Active: int64(q.Messages), - Delayed: *j.delayed, + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), }, nil case <-ctx.Done(): @@ -501,3 +502,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { return errors.E(op, errors.TimeOut, ctx.Err()) } } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 6a178dff..6323148b 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -235,6 +235,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: j.tName, + Ready: ready(atomic.LoadUint32(&j.listeners)), } // set stat, skip errors (replace with 0) @@ -353,3 +354,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { Start: time.Now(), }) } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 34778642..f0992cd6 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -113,6 +113,7 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { Queue: pipe.Name(), Active: atomic.LoadInt64(j.active), Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), }, nil } @@ -267,3 +268,7 @@ func (j *JobConsumer) consume() { } }() } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index d957b3eb..5d419b51 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -285,6 +285,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: *j.queueURL, + Ready: ready(atomic.LoadUint32(&j.listeners)), } nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) @@ -418,3 +419,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { return nil } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go new file mode 100644 index 00000000..61856a10 --- /dev/null +++ b/plugins/jobs/metrics.go @@ -0,0 +1,91 @@ +package jobs + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/informer" +) + +func (p *Plugin) MetricsCollector() []prometheus.Collector { + // p - implements Exporter interface (workers) + // other - request duration and count + return []prometheus.Collector{p.statsExporter} +} + +const ( + namespace = "rr_jobs" +) + +type statsExporter struct { + workers informer.Informer + workersMemory uint64 + jobsOk uint64 + pushOk uint64 + jobsErr uint64 + pushErr uint64 +} + +var ( + worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil) + pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil) + pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil) + jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil) + jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil) +) + +func newStatsExporter(stats informer.Informer) *statsExporter { + return &statsExporter{ + workers: stats, + workersMemory: 0, + jobsOk: 0, + pushOk: 0, + jobsErr: 0, + pushErr: 0, + } +} + +func (se *statsExporter) metricsCallback(event interface{}) { + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { //nolint:exhaustive + case events.EventJobOK: + atomic.AddUint64(&se.jobsOk, 1) + case events.EventPushOK: + atomic.AddUint64(&se.pushOk, 1) + case events.EventPushError: + atomic.AddUint64(&se.pushErr, 1) + case events.EventJobError: + atomic.AddUint64(&se.jobsErr, 1) + } + } +} + +func (se *statsExporter) Describe(d chan<- *prometheus.Desc) { + // send description + d <- pushErr + d <- pushOk + d <- jobsErr + d <- jobsOk +} + +func (se *statsExporter) Collect(ch chan<- prometheus.Metric) { + // get the copy of the processes + workers := se.workers.Workers() + + // cumulative RSS memory in bytes + var cum uint64 + + // collect the memory + for i := 0; i < len(workers); i++ { + cum += workers[i].MemoryUsage + } + + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum)) + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk))) + ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr))) + ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk))) + ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr))) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 5f6c8b94..5e62c5c5 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -59,7 +59,8 @@ type Plugin struct { stopCh chan struct{} // internal payloads pool - pldPool sync.Pool + pldPool sync.Pool + statsExporter *statsExporter } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -105,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) p.log = log + // metrics + p.statsExporter = newStatsExporter(p) + p.events.AddListener(p.statsExporter.metricsCallback) + return nil } @@ -202,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit 5. Pipeline name */ + start := time.Now() + p.events.Push(events.JobEvent{ + Event: events.EventJobStart, + ID: jb.ID(), + Start: start, + Elapsed: 0, + }) + ctx, err := jb.Context() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) @@ -220,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit resp, err := p.workersPool.Exec(exec) p.RUnlock() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // RR protocol level error, Nack the job errNack := jb.Nack() if errNack != nil { @@ -237,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.putPayload(exec) err = jb.Ack() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.log.Error("acknowledge error, job might be missed", "error", err) continue } + + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) } // handle the response protocol err = handleResponse(resp.Body, jb, p.log) if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.putPayload(exec) errNack := jb.Nack() if errNack != nil { @@ -256,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // return payload p.putPayload(exec) } @@ -359,7 +410,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback) if err != nil { return errors.E(op, err) } @@ -372,6 +423,7 @@ func (p *Plugin) Reset() error { func (p *Plugin) Push(j *job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { @@ -397,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error { err := d.Push(ctx, j) if err != nil { - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushOK, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return nil } @@ -410,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -432,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error { err := d.Push(ctx, j[i]) if err != nil { cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j[i].Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } @@ -576,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) { case events.EventPipePaused: p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: - p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: - p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushOK: p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushError: - p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobError: - p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeActive: p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeStopped: diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 2750cd8f..94f903d5 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -129,6 +129,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { Active: state[i].Active, Delayed: state[i].Delayed, Reserved: state[i].Reserved, + Ready: state[i].Ready, }) } diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go index d83a401a..bd8e3b43 100644 --- a/proto/jobs/v1beta/jobs.pb.go +++ b/proto/jobs/v1beta/jobs.pb.go @@ -499,6 +499,7 @@ type Stat struct { Active int64 `protobuf:"varint,4,opt,name=active,proto3" json:"active,omitempty"` Delayed int64 `protobuf:"varint,5,opt,name=delayed,proto3" json:"delayed,omitempty"` Reserved int64 `protobuf:"varint,6,opt,name=reserved,proto3" json:"reserved,omitempty"` + Ready bool `protobuf:"varint,7,opt,name=ready,proto3" json:"ready,omitempty"` } func (x *Stat) Reset() { @@ -575,6 +576,13 @@ func (x *Stat) GetReserved() int64 { return 0 } +func (x *Stat) GetReady() bool { + if x != nil { + return x.Ready + } + return false +} + var File_jobs_proto protoreflect.FileDescriptor var file_jobs_proto_rawDesc = []byte{ @@ -626,7 +634,7 @@ var file_jobs_proto_rawDesc = []byte{ 0x75, 0x65, 0x22, 0x30, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x27, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x52, 0x05, 0x53, - 0x74, 0x61, 0x74, 0x73, 0x22, 0x9e, 0x01, 0x0a, 0x04, 0x53, 0x74, 0x61, 0x74, 0x12, 0x1a, 0x0a, + 0x74, 0x61, 0x74, 0x73, 0x22, 0xb4, 0x01, 0x0a, 0x04, 0x53, 0x74, 0x61, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x64, 0x72, 0x69, 0x76, 0x65, @@ -636,8 +644,10 @@ var file_jobs_proto_rawDesc = []byte{ 0x18, 0x0a, 0x07, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x65, 0x72, 0x76, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x65, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x64, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, - 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x72, 0x76, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x07, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, + 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index 5d1f2dfe..c030c0df 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -48,7 +48,7 @@ message Stats { repeated Stat Stats = 1; } -// Stats used as a response for the Stats RPC call +// Stat used as a response for the Stats RPC call message Stat { string pipeline = 1; string driver = 2; @@ -56,4 +56,5 @@ message Stat { int64 active = 4; int64 delayed = 5; int64 reserved = 6; + bool ready = 7; } diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml index 4735070a..d93bc5af 100644 --- a/tests/env/docker-compose.yaml +++ b/tests/env/docker-compose.yaml @@ -37,3 +37,8 @@ services: ports: - "127.0.0.1:15672:15672" - "127.0.0.1:5672:5672" + + prometheus: + image: prom/prometheus + ports: + - "9090:9090" diff --git a/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml new file mode 100644 index 00000000..4db9a676 --- /dev/null +++ b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml @@ -0,0 +1,27 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +metrics: + address: 127.0.0.1:2112 + +logs: + level: info + encoding: console + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 1 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index 5ec81cec..5067ef9f 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -229,5 +229,6 @@ func stats(state *jobState.State) func(t *testing.T) { state.Active = st.Stats[0].Active state.Delayed = st.Stats[0].Delayed state.Reserved = st.Stats[0].Reserved + state.Ready = st.Stats[0].Ready } } diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index c35244c5..df84fabc 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -134,6 +134,10 @@ func TestAMQPDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -229,6 +233,10 @@ func TestAMQPJobsError(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) @@ -354,8 +362,11 @@ func TestAMQPStats(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() @@ -437,6 +448,7 @@ func TestAMQPStats(t *testing.T) { assert.Equal(t, int64(1), out.Active) assert.Equal(t, int64(1), out.Delayed) assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, false, out.Ready) time.Sleep(time.Second) t.Run("ResumePipeline", resumePipes("test-3")) @@ -452,6 +464,7 @@ func TestAMQPStats(t *testing.T) { assert.Equal(t, int64(0), out.Active) assert.Equal(t, int64(0), out.Delayed) assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, true, out.Ready) time.Sleep(time.Second) t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index ba798ee0..0ba69d2a 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -135,6 +135,10 @@ func TestBeanstalkDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() @@ -236,6 +240,10 @@ func TestBeanstalkJobsError(t *testing.T) { mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() @@ -335,6 +343,10 @@ func TestBeanstalkStats(t *testing.T) { mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index f793da6d..162579c8 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -130,6 +130,10 @@ func TestEphemeralDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -229,6 +233,10 @@ func TestEphemeralPauseResume(t *testing.T) { mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -323,6 +331,10 @@ func TestEphemeralJobsError(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) @@ -418,6 +430,10 @@ func TestEphemeralStats(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 22b80157..5d8d8d9c 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -1,6 +1,8 @@ package jobs import ( + "io/ioutil" + "net/http" "os" "os/signal" "sync" @@ -15,6 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -119,3 +122,125 @@ func TestJobsInit(t *testing.T) { stopCh <- struct{}{} wg.Wait() } + +func TestJOBSMetrics(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = "configs/.rr-jobs-metrics.yaml" + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &server.Plugin{}, + &jobs.Plugin{}, + &metrics.Plugin{}, + &ephemeral.Plugin{}, + mockLogger, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Minute * 3) + + go func() { + defer tt.Stop() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 2) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) + time.Sleep(time.Second) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 5) + + genericOut, err := get() + assert.NoError(t, err) + + assert.Contains(t, genericOut, `rr_jobs_jobs_err 0`) + assert.Contains(t, genericOut, `rr_jobs_jobs_ok 3`) + assert.Contains(t, genericOut, `rr_jobs_push_err 0`) + assert.Contains(t, genericOut, `rr_jobs_push_ok 3`) + assert.Contains(t, genericOut, "workers_memory_bytes") + + close(sig) + time.Sleep(time.Second * 2) +} + +const getAddr = "http://127.0.0.1:2112/metrics" + +// get request and return body +func get() (string, error) { + r, err := http.Get(getAddr) + if err != nil { + return "", err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", err + } + + err = r.Body.Close() + if err != nil { + return "", err + } + // unsafe + return string(b), err +} diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 191c8212..839e1aaa 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -135,6 +135,10 @@ func TestSQSDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -234,6 +238,10 @@ func TestSQSJobsError(t *testing.T) { mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() @@ -361,9 +369,12 @@ func TestSQSStat(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index bf71ed9e..f50d34cc 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -56,6 +56,12 @@ func TestDurabilityAMQP(t *testing.T) { mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-1", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-2", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4) // redial errors @@ -176,6 +182,9 @@ func TestDurabilitySQS(t *testing.T) { mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) // redial errors mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() @@ -246,7 +255,7 @@ func TestDurabilitySQS(t *testing.T) { time.Sleep(time.Second * 3) go func() { - time.Sleep(time.Second * 1) + time.Sleep(time.Second) t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) time.Sleep(time.Second) t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) @@ -292,9 +301,12 @@ func TestDurabilityBeanstalk(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) // redial errors mockLogger.EXPECT().Info("beanstalk redial was successful").MinTimes(2) mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() diff --git a/tests/plugins/metrics/docker-compose.yml b/tests/plugins/metrics/docker-compose.yml deleted file mode 100644 index 610633b4..00000000 --- a/tests/plugins/metrics/docker-compose.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: '3.7' - -services: - prometheus: - image: prom/prometheus - ports: - - 9090:9090 diff --git a/tests/plugins/service/service_plugin_test.go b/tests/plugins/service/service_plugin_test.go index 8948a458..ddf54520 100644 --- a/tests/plugins/service/service_plugin_test.go +++ b/tests/plugins/service/service_plugin_test.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package service diff --git a/tests/worker.php b/tests/temporal-worker.php index 5c9c80e6..5c9c80e6 100644 --- a/tests/worker.php +++ b/tests/temporal-worker.php diff --git a/utils/isolate.go b/utils/isolate.go index b05f4b3a..202f538c 100755 --- a/utils/isolate.go +++ b/utils/isolate.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package utils diff --git a/utils/isolate_win.go b/utils/isolate_win.go index b2b213a8..6b6d22e0 100755 --- a/utils/isolate_win.go +++ b/utils/isolate_win.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package utils diff --git a/utils/network.go b/utils/network.go index 86a7e733..301e2bab 100755 --- a/utils/network.go +++ b/utils/network.go @@ -1,3 +1,4 @@ +//go:build linux || darwin || freebsd // +build linux darwin freebsd package utils diff --git a/utils/network_windows.go b/utils/network_windows.go index 6eefb8f7..88e0fdb6 100755 --- a/utils/network_windows.go +++ b/utils/network_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package utils |