From c35fbff05205330ab8e49f6008fdbd59128cee14 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 18 Aug 2021 16:13:49 +0300 Subject: Add prometheus metrics for the jobs, update tests Signed-off-by: Valery Piashchynski --- tests/env/docker-compose.yaml | 5 + tests/plugins/jobs/configs/.rr-jobs-metrics.yaml | 27 +++++ tests/plugins/jobs/helpers.go | 1 + tests/plugins/jobs/jobs_amqp_test.go | 13 +++ tests/plugins/jobs/jobs_beanstalk_test.go | 12 +++ tests/plugins/jobs/jobs_ephemeral_test.go | 16 +++ tests/plugins/jobs/jobs_general_test.go | 125 +++++++++++++++++++++++ tests/plugins/jobs/jobs_sqs_test.go | 11 ++ tests/plugins/jobs/jobs_with_toxics_test.go | 14 ++- tests/plugins/metrics/docker-compose.yml | 7 -- tests/plugins/service/service_plugin_test.go | 1 + tests/temporal-worker.php | 34 ++++++ tests/worker.php | 34 ------ 13 files changed, 258 insertions(+), 42 deletions(-) create mode 100644 tests/plugins/jobs/configs/.rr-jobs-metrics.yaml delete mode 100644 tests/plugins/metrics/docker-compose.yml create mode 100644 tests/temporal-worker.php delete mode 100644 tests/worker.php (limited to 'tests') diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml index 4735070a..d93bc5af 100644 --- a/tests/env/docker-compose.yaml +++ b/tests/env/docker-compose.yaml @@ -37,3 +37,8 @@ services: ports: - "127.0.0.1:15672:15672" - "127.0.0.1:5672:5672" + + prometheus: + image: prom/prometheus + ports: + - "9090:9090" diff --git a/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml new file mode 100644 index 00000000..4db9a676 --- /dev/null +++ b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml @@ -0,0 +1,27 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +metrics: + address: 127.0.0.1:2112 + +logs: + level: info + encoding: console + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 1 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index 5ec81cec..5067ef9f 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -229,5 +229,6 @@ func stats(state *jobState.State) func(t *testing.T) { state.Active = st.Stats[0].Active state.Delayed = st.Stats[0].Delayed state.Reserved = st.Stats[0].Reserved + state.Ready = st.Stats[0].Ready } } diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index c35244c5..df84fabc 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -134,6 +134,10 @@ func TestAMQPDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -229,6 +233,10 @@ func TestAMQPJobsError(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) @@ -354,8 +362,11 @@ func TestAMQPStats(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() @@ -437,6 +448,7 @@ func TestAMQPStats(t *testing.T) { assert.Equal(t, int64(1), out.Active) assert.Equal(t, int64(1), out.Delayed) assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, false, out.Ready) time.Sleep(time.Second) t.Run("ResumePipeline", resumePipes("test-3")) @@ -452,6 +464,7 @@ func TestAMQPStats(t *testing.T) { assert.Equal(t, int64(0), out.Active) assert.Equal(t, int64(0), out.Delayed) assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, true, out.Ready) time.Sleep(time.Second) t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index ba798ee0..0ba69d2a 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -135,6 +135,10 @@ func TestBeanstalkDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() @@ -236,6 +240,10 @@ func TestBeanstalkJobsError(t *testing.T) { mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() @@ -335,6 +343,10 @@ func TestBeanstalkStats(t *testing.T) { mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index f793da6d..162579c8 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -130,6 +130,10 @@ func TestEphemeralDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -229,6 +233,10 @@ func TestEphemeralPauseResume(t *testing.T) { mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -323,6 +331,10 @@ func TestEphemeralJobsError(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) @@ -418,6 +430,10 @@ func TestEphemeralStats(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 22b80157..5d8d8d9c 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -1,6 +1,8 @@ package jobs import ( + "io/ioutil" + "net/http" "os" "os/signal" "sync" @@ -15,6 +17,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -119,3 +122,125 @@ func TestJobsInit(t *testing.T) { stopCh <- struct{}{} wg.Wait() } + +func TestJOBSMetrics(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = "configs/.rr-jobs-metrics.yaml" + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &server.Plugin{}, + &jobs.Plugin{}, + &metrics.Plugin{}, + &ephemeral.Plugin{}, + mockLogger, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Minute * 3) + + go func() { + defer tt.Stop() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 2) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) + time.Sleep(time.Second) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 5) + + genericOut, err := get() + assert.NoError(t, err) + + assert.Contains(t, genericOut, `rr_jobs_jobs_err 0`) + assert.Contains(t, genericOut, `rr_jobs_jobs_ok 3`) + assert.Contains(t, genericOut, `rr_jobs_push_err 0`) + assert.Contains(t, genericOut, `rr_jobs_push_ok 3`) + assert.Contains(t, genericOut, "workers_memory_bytes") + + close(sig) + time.Sleep(time.Second * 2) +} + +const getAddr = "http://127.0.0.1:2112/metrics" + +// get request and return body +func get() (string, error) { + r, err := http.Get(getAddr) + if err != nil { + return "", err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", err + } + + err = r.Body.Close() + if err != nil { + return "", err + } + // unsafe + return string(b), err +} diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 191c8212..839e1aaa 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -135,6 +135,10 @@ func TestSQSDeclare(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -234,6 +238,10 @@ func TestSQSJobsError(t *testing.T) { mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() @@ -361,9 +369,12 @@ func TestSQSStat(t *testing.T) { mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index bf71ed9e..f50d34cc 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -56,6 +56,12 @@ func TestDurabilityAMQP(t *testing.T) { mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-1", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-2", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4) // redial errors @@ -176,6 +182,9 @@ func TestDurabilitySQS(t *testing.T) { mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) // redial errors mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() @@ -246,7 +255,7 @@ func TestDurabilitySQS(t *testing.T) { time.Sleep(time.Second * 3) go func() { - time.Sleep(time.Second * 1) + time.Sleep(time.Second) t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) time.Sleep(time.Second) t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) @@ -292,9 +301,12 @@ func TestDurabilityBeanstalk(t *testing.T) { mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) // redial errors mockLogger.EXPECT().Info("beanstalk redial was successful").MinTimes(2) mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() diff --git a/tests/plugins/metrics/docker-compose.yml b/tests/plugins/metrics/docker-compose.yml deleted file mode 100644 index 610633b4..00000000 --- a/tests/plugins/metrics/docker-compose.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: '3.7' - -services: - prometheus: - image: prom/prometheus - ports: - - 9090:9090 diff --git a/tests/plugins/service/service_plugin_test.go b/tests/plugins/service/service_plugin_test.go index 8948a458..ddf54520 100644 --- a/tests/plugins/service/service_plugin_test.go +++ b/tests/plugins/service/service_plugin_test.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package service diff --git a/tests/temporal-worker.php b/tests/temporal-worker.php new file mode 100644 index 00000000..5c9c80e6 --- /dev/null +++ b/tests/temporal-worker.php @@ -0,0 +1,34 @@ + + */ +$getClasses = static function (string $dir): iterable { + $files = glob($dir . '/*.php'); + + foreach ($files as $file) { + yield substr(basename($file), 0, -4); + } +}; + +$factory = \Temporal\WorkerFactory::create(); + +$worker = $factory->newWorker('default'); + +// register all workflows +foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) { + $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); +} + +// register all activity +foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { + $class = 'Temporal\\Tests\\Activity\\' . $name; + $worker->registerActivityImplementations(new $class); +} + +$factory->run(); diff --git a/tests/worker.php b/tests/worker.php deleted file mode 100644 index 5c9c80e6..00000000 --- a/tests/worker.php +++ /dev/null @@ -1,34 +0,0 @@ - - */ -$getClasses = static function (string $dir): iterable { - $files = glob($dir . '/*.php'); - - foreach ($files as $file) { - yield substr(basename($file), 0, -4); - } -}; - -$factory = \Temporal\WorkerFactory::create(); - -$worker = $factory->newWorker('default'); - -// register all workflows -foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) { - $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); -} - -// register all activity -foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { - $class = 'Temporal\\Tests\\Activity\\' . $name; - $worker->registerActivityImplementations(new $class); -} - -$factory->run(); -- cgit v1.2.3