summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 11:35:12 +0300
committerValery Piashchynski <[email protected]>2021-07-14 11:35:12 +0300
commitd099e47ab28dd044d34e18347a4c714b8af3d612 (patch)
treee106e13bba48e435b87d218237b282d7f691b52c /tests
parentec7c049036d31fe030d106db9f0d268ea0296c5f (diff)
SQS driver.
Fix isssues in the AMQP driver. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'tests')
-rw-r--r--tests/env/docker-compose-jobs.yml22
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml11
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-test.yaml105
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go78
4 files changed, 187 insertions, 29 deletions
diff --git a/tests/env/docker-compose-jobs.yml b/tests/env/docker-compose-jobs.yml
deleted file mode 100644
index 7b88c9cf..00000000
--- a/tests/env/docker-compose-jobs.yml
+++ /dev/null
@@ -1,22 +0,0 @@
-version: "3"
-
-services:
- beanstalk:
- image: schickling/beanstalkd
- ports:
- - "11300:11300"
-
- sqs:
- image: vsouza/sqs-local
- ports:
- - "9324:9324"
-
- rabbitmq:
- image: rabbitmq:3-management
- environment:
- RABBITMQ_DEFAULT_USER: guest
- RABBITMQ_DEFAULT_PASS: guest
- RABBITMQ_DEFAULT_VHOST: /
- ports:
- - "15672:15672"
- - "5672:5672" \ No newline at end of file
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 80826acc..63ddc70d 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -19,8 +19,6 @@ sqs:
secret: api-secret
region: us-west-1
endpoint: http://localhost:9324
- declare:
- MessageRetentionPeriod: 86400
logs:
level: info
@@ -62,6 +60,7 @@ jobs:
pipeline_size: 1000000
queue: test-1-queue
exchange: default
+ exclusive: true
exchange_type: direct
routing_key: test
@@ -81,12 +80,14 @@ jobs:
pipeline_size: 1000000
test-3:
- # priority: 11 - not defined, 10 by default
- # driver locality not specified, local by default
driver: sqs
pipeline_size: 1000000
queue: default
+ attributes:
+ MessageRetentionPeriod: 86400
+ tags:
+ test: "tag"
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ]
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ]
diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
new file mode 100644
index 00000000..ee72c2b7
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
@@ -0,0 +1,105 @@
+rpc:
+ listen: unix:///tmp/rr.sock
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+ session_token: ""
+ ping_period: 10
+ attributes:
+ MessageRetentionPeriod: 86400
+
+logs:
+ level: info
+ encoding: console
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 64
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ test-1:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-4:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 100000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+ pipeline_size: 1000000
+
+ test-3:
+ driver: sqs
+ pipeline_size: 1000000
+ queue: default
+ attributes:
+ MessageRetentionPeriod: 86400
+ tags:
+ test: "tag"
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ]
+
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
index 034ffc45..c06b74e4 100644
--- a/tests/plugins/jobs/jobs_plugin_test.go
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -16,8 +16,9 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
- "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/amqp"
- "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
@@ -28,6 +29,79 @@ import (
"github.com/stretchr/testify/require"
)
+func TestTEMP_INTI(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-jobs-test.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &ephemeral.Plugin{},
+ &sqs.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 30000)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
func TestJobsInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)