summaryrefslogtreecommitdiff
path: root/tests/plugins/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/plugins/jobs')
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml2
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml6
-rw-r--r--tests/plugins/jobs/jobs_general_test.go10
-rw-r--r--tests/plugins/jobs/jobs_memory_test.go (renamed from tests/plugins/jobs/jobs_ephemeral_test.go)90
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-declare.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml)0
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-init.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml)4
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml)0
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml)6
8 files changed, 58 insertions, 60 deletions
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
index a4f31290..71b51dce 100644
--- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
@@ -7,9 +7,7 @@ server:
relay_timeout: "20s"
beanstalk:
- # beanstalk address
addr: tcp://127.0.0.1:11300
- # connect timeout
timeout: 10s
logs:
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index bf9f60cc..9813344e 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -45,17 +45,17 @@ jobs:
# list of broker pipelines associated with endpoints
pipelines:
test-local:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
test-local-2:
- driver: ephemeral
+ driver: memory
priority: 1
prefetch: 10000
test-local-3:
- driver: ephemeral
+ driver: memory
priority: 2
prefetch: 10000
diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go
index 951d6227..5c521c2b 100644
--- a/tests/plugins/jobs/jobs_general_test.go
+++ b/tests/plugins/jobs/jobs_general_test.go
@@ -14,9 +14,9 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/roadrunner/v2/plugins/amqp"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/metrics"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
@@ -68,7 +68,7 @@ func TestJobsInit(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
&amqp.Plugin{},
)
assert.NoError(t, err)
@@ -154,7 +154,7 @@ func TestJOBSMetrics(t *testing.T) {
&server.Plugin{},
&jobs.Plugin{},
&metrics.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
mockLogger,
)
assert.NoError(t, err)
@@ -204,8 +204,8 @@ func TestJOBSMetrics(t *testing.T) {
time.Sleep(time.Second * 2)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("DeclareEphemeralPipeline", declareMemoryPipe)
+ t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe)
t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_memory_test.go
index 2890aa9d..20cbfb3f 100644
--- a/tests/plugins/jobs/jobs_ephemeral_test.go
+++ b/tests/plugins/jobs/jobs_memory_test.go
@@ -15,9 +15,9 @@ import (
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -26,12 +26,12 @@ import (
"github.com/stretchr/testify/assert"
)
-func TestEphemeralInit(t *testing.T) {
+func TestMemoryInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-init.yaml",
+ Path: "memory/.rr-memory-init.yaml",
Prefix: "rr",
}
@@ -58,7 +58,7 @@ func TestEphemeralInit(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -112,12 +112,12 @@ func TestEphemeralInit(t *testing.T) {
wg.Wait()
}
-func TestEphemeralDeclare(t *testing.T) {
+func TestMemoryDeclare(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-declare.yaml",
+ Path: "memory/.rr-memory-declare.yaml",
Prefix: "rr",
}
@@ -135,7 +135,7 @@ func TestEphemeralDeclare(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
err = cont.RegisterAll(
@@ -146,7 +146,7 @@ func TestEphemeralDeclare(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -197,25 +197,25 @@ func TestEphemeralDeclare(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
stopCh <- struct{}{}
wg.Wait()
}
-func TestEphemeralPauseResume(t *testing.T) {
+func TestMemoryPauseResume(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-pause-resume.yaml",
+ Path: "memory/.rr-memory-pause-resume.yaml",
Prefix: "rr",
}
@@ -231,7 +231,7 @@ func TestEphemeralPauseResume(t *testing.T) {
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
@@ -249,7 +249,7 @@ func TestEphemeralPauseResume(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -301,10 +301,10 @@ func TestEphemeralPauseResume(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("ephemeralResume", resumePipes("test-local"))
- t.Run("ephemeralPause", pausePipelines("test-local"))
+ t.Run("Resume", resumePipes("test-local"))
+ t.Run("Pause", pausePipelines("test-local"))
t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local"))
- t.Run("ephemeralResume", resumePipes("test-local"))
+ t.Run("Resume", resumePipes("test-local"))
t.Run("pushToEnabledPipe", pushToPipe("test-local"))
time.Sleep(time.Second * 1)
@@ -313,12 +313,12 @@ func TestEphemeralPauseResume(t *testing.T) {
wg.Wait()
}
-func TestEphemeralJobsError(t *testing.T) {
+func TestMemoryJobsError(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-jobs-err.yaml",
+ Path: "memory/.rr-memory-jobs-err.yaml",
Prefix: "rr",
}
@@ -336,7 +336,7 @@ func TestEphemeralJobsError(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -348,7 +348,7 @@ func TestEphemeralJobsError(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -399,25 +399,25 @@ func TestEphemeralJobsError(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", resumePipes("test-3"))
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", resumePipes("test-3"))
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second * 25)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
stopCh <- struct{}{}
wg.Wait()
}
-func TestEphemeralStats(t *testing.T) {
+func TestMemoryStats(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-declare.yaml",
+ Path: "memory/.rr-memory-declare.yaml",
Prefix: "rr",
}
@@ -435,7 +435,7 @@ func TestEphemeralStats(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
err = cont.RegisterAll(
@@ -446,7 +446,7 @@ func TestEphemeralStats(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -497,22 +497,22 @@ func TestEphemeralStats(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("PushPipeline", pushToPipeDelayed("test-3", 5))
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
out := &jobState.State{}
t.Run("Stats", stats(out))
assert.Equal(t, out.Pipeline, "test-3")
- assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Driver, "memory")
assert.Equal(t, out.Queue, "test-3")
assert.Equal(t, out.Active, int64(1))
@@ -520,14 +520,14 @@ func TestEphemeralStats(t *testing.T) {
assert.Equal(t, out.Reserved, int64(0))
time.Sleep(time.Second)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
time.Sleep(time.Second * 7)
out = &jobState.State{}
t.Run("Stats", stats(out))
assert.Equal(t, out.Pipeline, "test-3")
- assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Driver, "memory")
assert.Equal(t, out.Queue, "test-3")
assert.Equal(t, out.Active, int64(0))
@@ -541,13 +541,13 @@ func TestEphemeralStats(t *testing.T) {
wg.Wait()
}
-func declareEphemeralPipe(t *testing.T) {
+func declareMemoryPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
- "driver": "ephemeral",
+ "driver": "memory",
"name": "test-3",
"prefetch": "10000",
}}
@@ -557,7 +557,7 @@ func declareEphemeralPipe(t *testing.T) {
assert.NoError(t, err)
}
-func consumeEphemeralPipe(t *testing.T) {
+func consumeMemoryPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml
index 726c24ac..726c24ac 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-declare.yaml
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml
index 8914dfaa..9ee8afc2 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-init.yaml
@@ -22,12 +22,12 @@ jobs:
pipelines:
test-1:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
test-2:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml
index 05dc3ffa..05dc3ffa 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml
index e1b76263..1ad48237 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml
@@ -25,17 +25,17 @@ jobs:
# list of broker pipelines associated with endpoints
pipelines:
test-local:
- driver: ephemeral
+ driver: memory
priority: 10
pipeline_size: 10000
test-local-2:
- driver: ephemeral
+ driver: memory
priority: 1
pipeline_size: 10000
test-local-3:
- driver: ephemeral
+ driver: memory
priority: 2
pipeline_size: 10000