diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 16:13:49 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-18 16:13:49 +0300 |
commit | c35fbff05205330ab8e49f6008fdbd59128cee14 (patch) | |
tree | d3eb03e8db7231d97ae4ff1d60a0c5a50db8a6fb /tests/plugins/jobs/jobs_general_test.go | |
parent | 1d092e57afb55a01283b41942ca3ef15a7e4bdef (diff) |
Add prometheus metrics for the jobs, update tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests/plugins/jobs/jobs_general_test.go')
-rw-r--r-- | tests/plugins/jobs/jobs_general_test.go | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 22b80157..5d8d8d9c 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -1,6 +1,8 @@ package jobs import ( + "io/ioutil" + "net/http" "os" "os/signal" "sync" @@ -15,6 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -119,3 +122,125 @@ func TestJobsInit(t *testing.T) { stopCh <- struct{}{} wg.Wait() } + +func TestJOBSMetrics(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = "configs/.rr-jobs-metrics.yaml" + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &server.Plugin{}, + &jobs.Plugin{}, + &metrics.Plugin{}, + &ephemeral.Plugin{}, + mockLogger, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Minute * 3) + + go func() { + defer tt.Stop() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 2) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) + time.Sleep(time.Second) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 5) + + genericOut, err := get() + assert.NoError(t, err) + + assert.Contains(t, genericOut, `rr_jobs_jobs_err 0`) + assert.Contains(t, genericOut, `rr_jobs_jobs_ok 3`) + assert.Contains(t, genericOut, `rr_jobs_push_err 0`) + assert.Contains(t, genericOut, `rr_jobs_push_ok 3`) + assert.Contains(t, genericOut, "workers_memory_bytes") + + close(sig) + time.Sleep(time.Second * 2) +} + +const getAddr = "http://127.0.0.1:2112/metrics" + +// get request and return body +func get() (string, error) { + r, err := http.Get(getAddr) + if err != nil { + return "", err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", err + } + + err = r.Body.Close() + if err != nil { + return "", err + } + // unsafe + return string(b), err +} |