diff options
Diffstat (limited to 'tests/plugins/jobs')
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-init.yaml | 6 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_general_test.go | 10 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_memory_test.go (renamed from tests/plugins/jobs/jobs_ephemeral_test.go) | 90 | ||||
-rw-r--r-- | tests/plugins/jobs/memory/.rr-memory-declare.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml) | 0 | ||||
-rw-r--r-- | tests/plugins/jobs/memory/.rr-memory-init.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml) | 4 | ||||
-rw-r--r-- | tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml) | 0 | ||||
-rw-r--r-- | tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml) | 6 |
8 files changed, 58 insertions, 60 deletions
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml index a4f31290..71b51dce 100644 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml @@ -7,9 +7,7 @@ server: relay_timeout: "20s" beanstalk: - # beanstalk address addr: tcp://127.0.0.1:11300 - # connect timeout timeout: 10s logs: diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index bf9f60cc..9813344e 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -45,17 +45,17 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-local: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 test-local-2: - driver: ephemeral + driver: memory priority: 1 prefetch: 10000 test-local-3: - driver: ephemeral + driver: memory priority: 2 prefetch: 10000 diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 951d6227..5c521c2b 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -14,9 +14,9 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -68,7 +68,7 @@ func TestJobsInit(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, &amqp.Plugin{}, ) assert.NoError(t, err) @@ -154,7 +154,7 @@ func TestJOBSMetrics(t *testing.T) { &server.Plugin{}, &jobs.Plugin{}, &metrics.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, mockLogger, ) assert.NoError(t, err) @@ -204,8 +204,8 @@ func TestJOBSMetrics(t *testing.T) { time.Sleep(time.Second * 2) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("DeclareEphemeralPipeline", declareMemoryPipe) + t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe) t.Run("PushEphemeralPipeline", pushToPipe("test-3")) time.Sleep(time.Second) t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_memory_test.go index 2890aa9d..20cbfb3f 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_memory_test.go @@ -15,9 +15,9 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -26,12 +26,12 @@ import ( "github.com/stretchr/testify/assert" ) -func TestEphemeralInit(t *testing.T) { +func TestMemoryInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-init.yaml", + Path: "memory/.rr-memory-init.yaml", Prefix: "rr", } @@ -58,7 +58,7 @@ func TestEphemeralInit(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -112,12 +112,12 @@ func TestEphemeralInit(t *testing.T) { wg.Wait() } -func TestEphemeralDeclare(t *testing.T) { +func TestMemoryDeclare(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", + Path: "memory/.rr-memory-declare.yaml", Prefix: "rr", } @@ -135,7 +135,7 @@ func TestEphemeralDeclare(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -146,7 +146,7 @@ func TestEphemeralDeclare(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -197,25 +197,25 @@ func TestEphemeralDeclare(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() } -func TestEphemeralPauseResume(t *testing.T) { +func TestMemoryPauseResume(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-pause-resume.yaml", + Path: "memory/.rr-memory-pause-resume.yaml", Prefix: "rr", } @@ -231,7 +231,7 @@ func TestEphemeralPauseResume(t *testing.T) { mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) @@ -249,7 +249,7 @@ func TestEphemeralPauseResume(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -301,10 +301,10 @@ func TestEphemeralPauseResume(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("ephemeralResume", resumePipes("test-local")) - t.Run("ephemeralPause", pausePipelines("test-local")) + t.Run("Resume", resumePipes("test-local")) + t.Run("Pause", pausePipelines("test-local")) t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) - t.Run("ephemeralResume", resumePipes("test-local")) + t.Run("Resume", resumePipes("test-local")) t.Run("pushToEnabledPipe", pushToPipe("test-local")) time.Sleep(time.Second * 1) @@ -313,12 +313,12 @@ func TestEphemeralPauseResume(t *testing.T) { wg.Wait() } -func TestEphemeralJobsError(t *testing.T) { +func TestMemoryJobsError(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-jobs-err.yaml", + Path: "memory/.rr-memory-jobs-err.yaml", Prefix: "rr", } @@ -336,7 +336,7 @@ func TestEphemeralJobsError(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -348,7 +348,7 @@ func TestEphemeralJobsError(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -399,25 +399,25 @@ func TestEphemeralJobsError(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", resumePipes("test-3")) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second * 25) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() } -func TestEphemeralStats(t *testing.T) { +func TestMemoryStats(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", + Path: "memory/.rr-memory-declare.yaml", Prefix: "rr", } @@ -435,7 +435,7 @@ func TestEphemeralStats(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -446,7 +446,7 @@ func TestEphemeralStats(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -497,22 +497,22 @@ func TestEphemeralStats(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("PushPipeline", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) out := &jobState.State{} t.Run("Stats", stats(out)) assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Driver, "memory") assert.Equal(t, out.Queue, "test-3") assert.Equal(t, out.Active, int64(1)) @@ -520,14 +520,14 @@ func TestEphemeralStats(t *testing.T) { assert.Equal(t, out.Reserved, int64(0)) time.Sleep(time.Second) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) time.Sleep(time.Second * 7) out = &jobState.State{} t.Run("Stats", stats(out)) assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Driver, "memory") assert.Equal(t, out.Queue, "test-3") assert.Equal(t, out.Active, int64(0)) @@ -541,13 +541,13 @@ func TestEphemeralStats(t *testing.T) { wg.Wait() } -func declareEphemeralPipe(t *testing.T) { +func declareMemoryPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "ephemeral", + "driver": "memory", "name": "test-3", "prefetch": "10000", }} @@ -557,7 +557,7 @@ func declareEphemeralPipe(t *testing.T) { assert.NoError(t, err) } -func consumeEphemeralPipe(t *testing.T) { +func consumeMemoryPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml index 726c24ac..726c24ac 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-declare.yaml diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml index 8914dfaa..9ee8afc2 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-init.yaml @@ -22,12 +22,12 @@ jobs: pipelines: test-1: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 test-2: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml index 05dc3ffa..05dc3ffa 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml index e1b76263..1ad48237 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml @@ -25,17 +25,17 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-local: - driver: ephemeral + driver: memory priority: 10 pipeline_size: 10000 test-local-2: - driver: ephemeral + driver: memory priority: 1 pipeline_size: 10000 test-local-3: - driver: ephemeral + driver: memory priority: 2 pipeline_size: 10000 |