diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /tests/plugins/jobs | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'tests/plugins/jobs')
33 files changed, 0 insertions, 4460 deletions
diff --git a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml deleted file mode 100644 index f9a7308b..00000000 --- a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml +++ /dev/null @@ -1,24 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:[email protected]:5672/ - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml deleted file mode 100644 index 43840545..00000000 --- a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml +++ /dev/null @@ -1,55 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:[email protected]:5672/ - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 1 - pipeline_size: 100000 - timeout: 1 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: amqp - prefetch: 100 - queue: test-1-queue - priority: 1 - exchange: default - exchange_type: direct - routing_key: test-1 - exclusive: false - multiple_ack: false - requeue_on_fail: false - - test-2: - driver: amqp - prefetch: 100 - queue: test-2-queue - priority: 2 - exchange: default - exchange_type: direct - routing_key: test-2 - exclusive: false - multiple_ack: false - requeue_on_fail: false - - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml deleted file mode 100644 index 79493d96..00000000 --- a/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml +++ /dev/null @@ -1,24 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_err.php" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:[email protected]:5672/ - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 1 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/amqp/.rr-no-global.yaml b/tests/plugins/jobs/amqp/.rr-no-global.yaml deleted file mode 100644 index 1b01eb73..00000000 --- a/tests/plugins/jobs/amqp/.rr-no-global.yaml +++ /dev/null @@ -1,47 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: error - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-1: - driver: amqp - priority: 1 - pipeline_size: 100 - queue: test-1-queue - exchange: default - exchange_type: direct - routing_key: test - - test-2: - driver: amqp - priority: 2 - pipeline_size: 100 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml deleted file mode 100644 index 3555ef96..00000000 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml +++ /dev/null @@ -1,27 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -beanstalk: - # beanstalk address - addr: tcp://127.0.0.1:11300 - # connect timeout - timeout: 10s - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml deleted file mode 100644 index cf9069a8..00000000 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml +++ /dev/null @@ -1,45 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -beanstalk: - addr: tcp://127.0.0.1:11300 - timeout: 10s - -logs: - level: info - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: beanstalk - priority: 11 - tube_priority: 1 - tube: default-1 - reserve_timeout: 10s - - test-2: - driver: beanstalk - priority: 11 - tube_priority: 3 - tube: default-2 - reserve_timeout: 10s - - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml deleted file mode 100644 index 71b51dce..00000000 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml +++ /dev/null @@ -1,25 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_err.php" - relay: "pipes" - relay_timeout: "20s" - -beanstalk: - addr: tcp://127.0.0.1:11300 - timeout: 10s - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml deleted file mode 100644 index 92d090d4..00000000 --- a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml +++ /dev/null @@ -1,34 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: error - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-1: - driver: beanstalk - priority: 11 - tube_priority: 1 - tube: default-1 - reserve_timeout: 10s - - consume: [ "test-1" ] - -endure: - log_level: debug diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml deleted file mode 100644 index cdc2655f..00000000 --- a/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml +++ /dev/null @@ -1,24 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -boltdb: - permissions: 0777 - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml deleted file mode 100644 index 804db543..00000000 --- a/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml +++ /dev/null @@ -1,43 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -boltdb: - permissions: 0777 - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 1 - pipeline_size: 100000 - timeout: 1 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: boltdb - prefetch: 100 - file: "rr1.db" - priority: 1 - - test-2: - driver: boltdb - prefetch: 100 - file: "rr2.db" - priority: 2 - - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml deleted file mode 100644 index d375a9a5..00000000 --- a/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml +++ /dev/null @@ -1,24 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_err.php" - relay: "pipes" - relay_timeout: "20s" - -boltdb: - permissions: 0777 - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 1 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml deleted file mode 100644 index 54aaf3c6..00000000 --- a/tests/plugins/jobs/boltdb/.rr-no-global.yaml +++ /dev/null @@ -1,41 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: error - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-1: - driver: boltdb - prefetch: 100 - file: "rr1.db" - priority: 1 - - test-2: - driver: boltdb - prefetch: 100 - file: "rr2.db" - priority: 1 - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml deleted file mode 100644 index 9813344e..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ /dev/null @@ -1,112 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:[email protected]:5672/ - -# beanstalk configuration -# -beanstalk: - # beanstalk address - addr: tcp://127.0.0.1:11300 - # connect timeout - timeout: 10s - -# amazon sqs configuration -# General section -sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://127.0.0.1:9324 - -logs: - level: info - encoding: console - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: memory - priority: 10 - prefetch: 10000 - - test-local-2: - driver: memory - priority: 1 - prefetch: 10000 - - test-local-3: - driver: memory - priority: 2 - prefetch: 10000 - - test-1: - driver: amqp - # QoS - prefetch: 1000000 - # Queue name - queue: test-1-queue - # Pipeline jobs priority, 1 - highest - priority: 1 - # Exchange - exchange: default - # Exchange type: direct, topic, fanout - exchange_type: direct - # Routing key for the queue - routing_key: test - # Declare a queue exclusive at the exchange - exclusive: false - # When multiple is true, this delivery and all prior unacknowledged deliveries - # on the same channel will be acknowledged. This is useful for batch processing - # of deliveries - multiple_ack: false - # When multiple is true, this delivery and all prior unacknowledged deliveries - # on the same channel will be acknowledged. This is useful for batch processing - # of deliveries - requeue_on_fail: false - - test-2-amqp: - driver: amqp - priority: 2 - prefetch: 1000000 - queue: test-2-queue - exchange: default - exchange_type: direct - routing_key: test-2 - - test-2: - driver: beanstalk - priority: 11 - tube: default - - test-3: - driver: sqs - prefetch: 1000000 - queue: default - attributes: - MessageRetentionPeriod: 86400 - tags: - test: "tag" - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ] - diff --git a/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml deleted file mode 100644 index 4db9a676..00000000 --- a/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml +++ /dev/null @@ -1,27 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -metrics: - address: 127.0.0.1:2112 - -logs: - level: info - encoding: console - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 1 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/durability/.rr-amqp-durability-redial.yaml b/tests/plugins/jobs/durability/.rr-amqp-durability-redial.yaml deleted file mode 100644 index 2c4709ba..00000000 --- a/tests/plugins/jobs/durability/.rr-amqp-durability-redial.yaml +++ /dev/null @@ -1,55 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -amqp: - addr: amqp://guest:[email protected]:23679/ - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - timeout: 1 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: amqp - prefetch: 100 - queue: test-1-queue - priority: 1 - exchange: default - exchange_type: direct - routing_key: test-1 - exclusive: false - multiple_ack: false - requeue_on_fail: false - - test-2: - driver: amqp - prefetch: 100 - queue: test-2-queue - priority: 2 - exchange: default - exchange_type: direct - routing_key: test-2 - exclusive: false - multiple_ack: false - requeue_on_fail: false - - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/durability/.rr-beanstalk-durability-redial.yaml b/tests/plugins/jobs/durability/.rr-beanstalk-durability-redial.yaml deleted file mode 100644 index 57d8ad2d..00000000 --- a/tests/plugins/jobs/durability/.rr-beanstalk-durability-redial.yaml +++ /dev/null @@ -1,44 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -beanstalk: - addr: tcp://127.0.0.1:11400 - timeout: 10s - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: beanstalk - priority: 11 - tube_priority: 1 - tube: default-1 - reserve_timeout: 10s - - test-2: - driver: beanstalk - priority: 11 - tube_priority: 3 - tube: default-2 - reserve_timeout: 10s - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml b/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml deleted file mode 100644 index b6ba83a4..00000000 --- a/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml +++ /dev/null @@ -1,60 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://127.0.0.1:19324 - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - timeout: 20 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: sqs - prefetch: 10 - visibility_timeout: 0 - wait_time_seconds: 1 - queue: default - # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html - attributes: - DelaySeconds: 0 - MaximumMessageSize: 262144 - MessageRetentionPeriod: 345600 - ReceiveMessageWaitTimeSeconds: 0 - VisibilityTimeout: 0 - tags: - test: "tag" - - test-2: - driver: sqs - prefetch: 10 - queue: default-2 - wait_time_seconds: 1 - attributes: - MessageRetentionPeriod: 86400 - tags: - test: "tag" - - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go deleted file mode 100644 index 6c2d05ca..00000000 --- a/tests/plugins/jobs/helpers.go +++ /dev/null @@ -1,234 +0,0 @@ -package jobs - -import ( - "bytes" - "net" - "net/http" - "net/rpc" - "testing" - - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - push string = "jobs.Push" - pause string = "jobs.Pause" - destroy string = "jobs.Destroy" - resume string = "jobs.Resume" - stat string = "jobs.Stat" -) - -func resumePipes(pipes ...string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := 0; i < len(pipes); i++ { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsv1beta.Empty{} - err = client.Call(resume, pipe, er) - assert.NoError(t, err) - } -} - -func pushToDisabledPipe(pipeline string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "1", - Payload: `{"hello":"world"}`, - Headers: nil, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call(push, req, er) - assert.NoError(t, err) - } -} - -func pushToPipe(pipeline string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "1", - Payload: `{"hello":"world"}`, - Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - Delay: 0, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call(push, req, er) - assert.NoError(t, err) - } -} - -func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "2", - Payload: `{"hello":"world"}`, - Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - Delay: delay, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call(push, req, er) - assert.NoError(t, err) - } -} - -func pushToPipeErr(pipeline string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - require.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ - Job: "some/php/namespace", - Id: "1", - Payload: `{"hello":"world"}`, - Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, - Options: &jobsv1beta.Options{ - Priority: 1, - Pipeline: pipeline, - Delay: 0, - }, - }} - - er := &jobsv1beta.Empty{} - err = client.Call(push, req, er) - require.Error(t, err) - } -} -func pausePipelines(pipes ...string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := 0; i < len(pipes); i++ { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsv1beta.Empty{} - err = client.Call(pause, pipe, er) - assert.NoError(t, err) - } -} - -func destroyPipelines(pipes ...string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} - - for i := 0; i < len(pipes); i++ { - pipe.GetPipelines()[i] = pipes[i] - } - - er := &jobsv1beta.Empty{} - err = client.Call(destroy, pipe, er) - assert.NoError(t, err) - } -} - -func enableProxy(name string, t *testing.T) { - buf := new(bytes.Buffer) - buf.WriteString(`{"enabled":true}`) - - resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() - } -} - -func disableProxy(name string, t *testing.T) { - buf := new(bytes.Buffer) - buf.WriteString(`{"enabled":false}`) - - resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() - } -} - -func deleteProxy(name string, t *testing.T) { - client := &http.Client{} - - req, err := http.NewRequest(http.MethodDelete, "http://127.0.0.1:8474/proxies/"+name, nil) //nolint:noctx - require.NoError(t, err) - - resp, err := client.Do(req) - require.NoError(t, err) - - require.NoError(t, err) - require.Equal(t, 204, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() - } -} - -func stats(state *jobState.State) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - st := &jobsv1beta.Stats{} - er := &jobsv1beta.Empty{} - - err = client.Call(stat, er, st) - require.NoError(t, err) - require.NotNil(t, st) - - state.Queue = st.Stats[0].Queue - state.Pipeline = st.Stats[0].Pipeline - state.Driver = st.Stats[0].Driver - state.Active = st.Stats[0].Active - state.Delayed = st.Stats[0].Delayed - state.Reserved = st.Stats[0].Reserved - state.Ready = st.Stats[0].Ready - } -} diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go deleted file mode 100644 index 949698ec..00000000 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ /dev/null @@ -1,499 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/amqp" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestAMQPInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "amqp/.rr-amqp-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() -} - -func TestAMQPDeclare(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "amqp/.rr-amqp-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareAMQPPipeline", declareAMQPPipe) - t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) - t.Run("PushAMQPPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PauseAMQPPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestAMQPJobsError(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "amqp/.rr-amqp-jobs-err.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareAMQPPipeline", declareAMQPPipe) - t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) - t.Run("PushAMQPPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 25) - t.Run("PauseAMQPPipeline", pausePipelines("test-3")) - t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestAMQPNoGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "amqp/.rr-no-global.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - require.Error(t, err) -} - -func TestAMQPStats(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "amqp/.rr-amqp-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareAMQPPipeline", declareAMQPPipe) - t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) - t.Run("PushAMQPPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 2) - t.Run("PauseAMQPPipeline", pausePipelines("test-3")) - time.Sleep(time.Second * 2) - t.Run("PushAMQPPipeline", pushToPipe("test-3")) - t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) - - out := &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "amqp") - assert.Equal(t, out.Queue, "default") - - assert.Equal(t, int64(1), out.Active) - assert.Equal(t, int64(1), out.Delayed) - assert.Equal(t, int64(0), out.Reserved) - assert.Equal(t, false, out.Ready) - - time.Sleep(time.Second) - t.Run("ResumePipeline", resumePipes("test-3")) - time.Sleep(time.Second * 7) - - out = &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "amqp") - assert.Equal(t, out.Queue, "default") - - assert.Equal(t, int64(0), out.Active) - assert.Equal(t, int64(0), out.Delayed) - assert.Equal(t, int64(0), out.Reserved) - assert.Equal(t, true, out.Ready) - - time.Sleep(time.Second) - t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func declareAMQPPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "amqp", - "name": "test-3", - "routing_key": "test-3", - "queue": "default", - "exchange_type": "direct", - "exchange": "amqp.default", - "prefetch": "100", - "priority": "3", - "exclusive": "true", - "multiple_ask": "true", - "requeue_on_fail": "true", - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) - assert.NoError(t, err) -} diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go deleted file mode 100644 index 9f4d37ec..00000000 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ /dev/null @@ -1,515 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/beanstalk" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestBeanstalkInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "beanstalk/.rr-beanstalk-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &beanstalk.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() -} - -func TestBeanstalkDeclare(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "beanstalk/.rr-beanstalk-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &beanstalk.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) - t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) - t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestBeanstalkJobsError(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "beanstalk/.rr-beanstalk-jobs-err.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() - - mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &beanstalk.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) - t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) - t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 25) - t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestBeanstalkStats(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "beanstalk/.rr-beanstalk-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &beanstalk.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclarePipeline", declareBeanstalkPipe) - t.Run("ConsumePipeline", resumePipes("test-3")) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 2) - t.Run("PausePipeline", pausePipelines("test-3")) - time.Sleep(time.Second * 3) - t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - - out := &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "beanstalk") - assert.Equal(t, out.Queue, "default") - - // try 5 times - if out.Active == 0 { - for i := 0; i < 5; i++ { - time.Sleep(time.Second) - out = &jobState.State{} - t.Run("Stats", stats(out)) - if out.Active == 1 { - break - } - } - } - - assert.Equal(t, int64(1), out.Active) - assert.Equal(t, int64(1), out.Delayed) - assert.Equal(t, int64(0), out.Reserved) - - time.Sleep(time.Second) - t.Run("ResumePipeline", resumePipes("test-3")) - time.Sleep(time.Second * 15) - - out = &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "beanstalk") - assert.Equal(t, out.Queue, "default") - - assert.Equal(t, int64(0), out.Active) - assert.Equal(t, int64(0), out.Delayed) - assert.Equal(t, int64(0), out.Reserved) - - time.Sleep(time.Second) - t.Run("DestroyPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second) - stopCh <- struct{}{} - wg.Wait() -} - -func TestBeanstalkNoGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "beanstalk/.rr-no-global.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &beanstalk.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - require.Error(t, err) -} - -func declareBeanstalkPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "beanstalk", - "name": "test-3", - "tube": "default", - "reserve_timeout": "1", - "priority": "3", - "tube_priority": "10", - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) - assert.NoError(t, err) -} diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go deleted file mode 100644 index ab36ffa4..00000000 --- a/tests/plugins/jobs/jobs_boltdb_test.go +++ /dev/null @@ -1,506 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/boltdb" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - rr1db string = "rr1.db" - rr2db string = "rr2.db" -) - -func TestBoltDBInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "boltdb/.rr-boltdb-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("boltdb listener stopped").Times(4) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &boltdb.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() - - assert.NoError(t, os.Remove(rr1db)) - assert.NoError(t, os.Remove(rr2db)) -} - -func TestBoltDBDeclare(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "boltdb/.rr-boltdb-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &boltdb.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) - t.Run("ConsumePipeline", resumePipes("test-3")) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PausePipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() - assert.NoError(t, os.Remove(rr1db)) -} - -func TestBoltDBJobsError(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "boltdb/.rr-boltdb-jobs-err.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &boltdb.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) - t.Run("ConsumePipeline", resumePipes("test-3")) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 25) - t.Run("PausePipeline", pausePipelines("test-3")) - t.Run("DestroyPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() - assert.NoError(t, os.Remove(rr1db)) -} - -func TestBoltDBNoGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "boltdb/.rr-no-global.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &boltdb.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - require.Error(t, err) -} - -func TestBoltDBStats(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "boltdb/.rr-boltdb-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("boltdb listener stopped").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &boltdb.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) - t.Run("ConsumePipeline", resumePipes("test-3")) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 2) - t.Run("PausePipeline", pausePipelines("test-3")) - time.Sleep(time.Second * 2) - t.Run("PushPipeline", pushToPipe("test-3")) - t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) - - out := &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, "test-3", out.Pipeline) - assert.Equal(t, "boltdb", out.Driver) - assert.Equal(t, "push", out.Queue) - - assert.Equal(t, int64(1), out.Active) - assert.Equal(t, int64(1), out.Delayed) - assert.Equal(t, int64(0), out.Reserved) - assert.Equal(t, false, out.Ready) - - time.Sleep(time.Second) - t.Run("ResumePipeline", resumePipes("test-3")) - time.Sleep(time.Second * 7) - - out = &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, "test-3", out.Pipeline) - assert.Equal(t, "boltdb", out.Driver) - assert.Equal(t, "push", out.Queue) - - assert.Equal(t, int64(0), out.Active) - assert.Equal(t, int64(0), out.Delayed) - assert.Equal(t, int64(0), out.Reserved) - assert.Equal(t, true, out.Ready) - - time.Sleep(time.Second) - t.Run("DestroyPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() - assert.NoError(t, os.Remove(rr1db)) -} - -func declareBoltDBPipe(file string) func(t *testing.T) { - return func(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "boltdb", - "name": "test-3", - "prefetch": "100", - "priority": "3", - "file": file, - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) - assert.NoError(t, err) - } -} diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go deleted file mode 100644 index 5c521c2b..00000000 --- a/tests/plugins/jobs/jobs_general_test.go +++ /dev/null @@ -1,249 +0,0 @@ -package jobs - -import ( - "io/ioutil" - "net/http" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/amqp" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/memory" - "github.com/spiral/roadrunner/v2/plugins/metrics" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func TestJobsInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "configs/.rr-jobs-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &memory.Plugin{}, - &amqp.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestJOBSMetrics(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - cfg := &config.Viper{} - cfg.Prefix = "rr" - cfg.Path = "configs/.rr-jobs-metrics.yaml" - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &rpcPlugin.Plugin{}, - &server.Plugin{}, - &jobs.Plugin{}, - &metrics.Plugin{}, - &memory.Plugin{}, - mockLogger, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - tt := time.NewTimer(time.Minute * 3) - wg := &sync.WaitGroup{} - wg.Add(1) - - go func() { - defer tt.Stop() - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-tt.C: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 2) - - t.Run("DeclareEphemeralPipeline", declareMemoryPipe) - t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) - time.Sleep(time.Second) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 5) - - genericOut, err := get() - assert.NoError(t, err) - - assert.Contains(t, genericOut, `rr_jobs_jobs_err 0`) - assert.Contains(t, genericOut, `rr_jobs_jobs_ok 3`) - assert.Contains(t, genericOut, `rr_jobs_push_err 0`) - assert.Contains(t, genericOut, `rr_jobs_push_ok 3`) - assert.Contains(t, genericOut, "workers_memory_bytes") - - close(sig) - wg.Wait() -} - -const getAddr = "http://127.0.0.1:2112/metrics" - -// get request and return body -func get() (string, error) { - r, err := http.Get(getAddr) - if err != nil { - return "", err - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return "", err - } - - err = r.Body.Close() - if err != nil { - return "", err - } - // unsafe - return string(b), err -} diff --git a/tests/plugins/jobs/jobs_memory_test.go b/tests/plugins/jobs/jobs_memory_test.go deleted file mode 100644 index 7e39c556..00000000 --- a/tests/plugins/jobs/jobs_memory_test.go +++ /dev/null @@ -1,570 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/memory" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func TestMemoryInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "memory/.rr-memory-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 1) - stopCh <- struct{}{} - wg.Wait() -} - -func TestMemoryDeclare(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "memory/.rr-memory-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclarePipeline", declareMemoryPipe) - t.Run("ConsumePipeline", consumeMemoryPipe) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PausePipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestMemoryPauseResume(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "memory/.rr-memory-pause-resume.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &memory.Plugin{}, - ) - - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("Pause", pausePipelines("test-local")) - t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) - t.Run("Resume", resumePipes("test-local")) - t.Run("pushToEnabledPipe", pushToPipe("test-local")) - time.Sleep(time.Second * 1) - - stopCh <- struct{}{} - time.Sleep(time.Second) - wg.Wait() -} - -func TestMemoryJobsError(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "memory/.rr-memory-jobs-err.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclarePipeline", declareMemoryPipe) - t.Run("ConsumePipeline", resumePipes("test-3")) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 25) - t.Run("PausePipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroyPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestMemoryStats(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "memory/.rr-memory-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &memory.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclarePipeline", declareMemoryPipe) - t.Run("ConsumePipeline", consumeMemoryPipe) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PausePipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - - t.Run("PushPipeline", pushToPipeDelayed("test-3", 5)) - t.Run("PushPipeline", pushToPipe("test-3")) - - time.Sleep(time.Second) - out := &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "memory") - assert.Equal(t, out.Queue, "test-3") - - assert.Equal(t, out.Active, int64(1)) - assert.Equal(t, out.Delayed, int64(1)) - assert.Equal(t, out.Reserved, int64(0)) - - time.Sleep(time.Second) - t.Run("ConsumePipeline", consumeMemoryPipe) - time.Sleep(time.Second * 7) - - out = &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "memory") - assert.Equal(t, out.Queue, "test-3") - - assert.Equal(t, out.Active, int64(0)) - assert.Equal(t, out.Delayed, int64(0)) - assert.Equal(t, out.Reserved, int64(0)) - - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func declareMemoryPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "memory", - "name": "test-3", - "prefetch": "10000", - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) - assert.NoError(t, err) -} - -func consumeMemoryPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} - pipe.GetPipelines()[0] = "test-3" - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Resume", pipe, er) - assert.NoError(t, err) -} diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go deleted file mode 100644 index 2dd2c8db..00000000 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ /dev/null @@ -1,503 +0,0 @@ -package jobs - -import ( - "net" - "net/rpc" - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - jobState "github.com/spiral/roadrunner/v2/pkg/state/job" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/sqs" - jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSQSInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "sqs/.rr-sqs-init.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() - mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &sqs.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - stopCh <- struct{}{} - wg.Wait() -} - -func TestSQSDeclare(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "sqs/.rr-sqs-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() - mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &sqs.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareSQSPipeline", declareSQSPipe) - t.Run("ConsumeSQSPipeline", resumePipes("test-3")) - t.Run("PushSQSPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PauseSQSPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroySQSPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func TestSQSJobsError(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "sqs/.rr-sqs-jobs-err.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() - mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &sqs.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclareSQSPipeline", declareSQSPipe) - t.Run("ConsumeSQSPipeline", resumePipes("test-3")) - t.Run("PushSQSPipeline", pushToPipe("test-3")) - time.Sleep(time.Second * 25) - t.Run("PauseSQSPipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - t.Run("DestroySQSPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() - - time.Sleep(time.Second * 5) -} - -func TestSQSNoGlobalSection(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "sqs/.rr-no-global.yaml", - Prefix: "rr", - } - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &sqs.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - _, err = cont.Serve() - require.Error(t, err) -} - -func TestSQSStat(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - assert.NoError(t, err) - - cfg := &config.Viper{ - Path: "sqs/.rr-sqs-declare.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() - mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &sqs.Plugin{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - - t.Run("DeclarePipeline", declareSQSPipe) - t.Run("ConsumePipeline", resumePipes("test-3")) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - t.Run("PausePipeline", pausePipelines("test-3")) - time.Sleep(time.Second) - - t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) - t.Run("PushPipeline", pushToPipe("test-3")) - time.Sleep(time.Second) - - out := &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "sqs") - assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default") - - assert.Equal(t, int64(1), out.Active) - assert.Equal(t, int64(1), out.Delayed) - assert.Equal(t, int64(0), out.Reserved) - - time.Sleep(time.Second) - t.Run("ResumePipeline", resumePipes("test-3")) - time.Sleep(time.Second * 7) - - out = &jobState.State{} - t.Run("Stats", stats(out)) - - assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "sqs") - assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default") - - assert.Equal(t, int64(0), out.Active) - assert.Equal(t, int64(0), out.Delayed) - assert.Equal(t, int64(0), out.Reserved) - - t.Run("DestroyPipeline", destroyPipelines("test-3")) - - time.Sleep(time.Second * 5) - stopCh <- struct{}{} - wg.Wait() -} - -func declareSQSPipe(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6001") - assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - - pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "sqs", - "name": "test-3", - "queue": "default", - "prefetch": "10", - "priority": "3", - "visibility_timeout": "0", - "wait_time_seconds": "3", - "tags": `{"key":"value"}`, - }} - - er := &jobsv1beta.Empty{} - err = client.Call("jobs.Declare", pipe, er) - assert.NoError(t, err) -} diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go deleted file mode 100644 index 84fbec48..00000000 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ /dev/null @@ -1,400 +0,0 @@ -package jobs - -import ( - "os" - "os/signal" - "sync" - "syscall" - "testing" - "time" - - toxiproxy "github.com/Shopify/toxiproxy/client" - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/amqp" - "github.com/spiral/roadrunner/v2/plugins/beanstalk" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/informer" - "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/resetter" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/sqs" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestDurabilityAMQP(t *testing.T) { - client := toxiproxy.NewClient("127.0.0.1:8474") - proxies, err := client.Proxies() - require.NoError(t, err) - - for p := range proxies { - _ = proxies[p].Delete() - } - - proxy, err := client.CreateProxy("redial", "127.0.0.1:23679", "127.0.0.1:5672") - require.NoError(t, err) - defer func() { - _ = proxy.Delete() - }() - - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - require.NoError(t, err) - - cfg := &config.Viper{ - Path: "durability/.rr-amqp-durability-redial.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-1", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-2", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - - mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4) - - // redial errors - mockLogger.EXPECT().Warn("rabbitmq reconnecting, caused by", "error", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("rabbitmq dial succeed. trying to redeclare queues and subscribers").AnyTimes() - mockLogger.EXPECT().Info("queues and subscribers redeclared successfully").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &amqp.Plugin{}, - ) - require.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - disableProxy("redial", t) - time.Sleep(time.Second * 3) - - go func() { - time.Sleep(time.Second * 5) - enableProxy("redial", t) - }() - - t.Run("PushPipelineWhileRedialing-1", pushToPipeErr("test-1")) - t.Run("PushPipelineWhileRedialing-2", pushToPipeErr("test-2")) - - time.Sleep(time.Second * 15) - t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) - t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) - - time.Sleep(time.Second * 5) - - stopCh <- struct{}{} - wg.Wait() -} - -func TestDurabilitySQS(t *testing.T) { - client := toxiproxy.NewClient("127.0.0.1:8474") - - _, err := client.CreateProxy("redial", "127.0.0.1:19324", "127.0.0.1:9324") - require.NoError(t, err) - defer deleteProxy("redial", t) - - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - require.NoError(t, err) - - cfg := &config.Viper{ - Path: "durability/.rr-sqs-durability-redial.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - // redial errors - mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() - - // stop - mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes() - mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &sqs.Plugin{}, - ) - require.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - disableProxy("redial", t) - time.Sleep(time.Second * 3) - - go func() { - time.Sleep(time.Second) - t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) - time.Sleep(time.Second) - t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) - }() - - time.Sleep(time.Second * 5) - enableProxy("redial", t) - - t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) - t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) - - time.Sleep(time.Second * 10) - - stopCh <- struct{}{} - wg.Wait() -} - -func TestDurabilityBeanstalk(t *testing.T) { - client := toxiproxy.NewClient("127.0.0.1:8474") - - _, err := client.CreateProxy("redial", "127.0.0.1:11400", "127.0.0.1:11300") - require.NoError(t, err) - defer deleteProxy("redial", t) - - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - require.NoError(t, err) - - cfg := &config.Viper{ - Path: "durability/.rr-beanstalk-durability-redial.yaml", - Prefix: "rr", - } - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - // general - mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - - mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) - // redial errors - mockLogger.EXPECT().Info("beanstalk redial was successful").MinTimes(2) - mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() - - err = cont.RegisterAll( - cfg, - &server.Plugin{}, - &rpcPlugin.Plugin{}, - mockLogger, - &jobs.Plugin{}, - &resetter.Plugin{}, - &informer.Plugin{}, - &beanstalk.Plugin{}, - ) - require.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - if err != nil { - t.Fatal(err) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - time.Sleep(time.Second * 3) - disableProxy("redial", t) - time.Sleep(time.Second * 3) - - go func() { - time.Sleep(time.Second * 2) - t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) - t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) - }() - - time.Sleep(time.Second * 5) - enableProxy("redial", t) - - t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) - t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) - - time.Sleep(time.Second * 10) - - stopCh <- struct{}{} - wg.Wait() -} diff --git a/tests/plugins/jobs/memory/.rr-memory-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml deleted file mode 100644 index 726c24ac..00000000 --- a/tests/plugins/jobs/memory/.rr-memory-declare.yaml +++ /dev/null @@ -1,21 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/memory/.rr-memory-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml deleted file mode 100644 index 9ee8afc2..00000000 --- a/tests/plugins/jobs/memory/.rr-memory-init.yaml +++ /dev/null @@ -1,37 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: memory - priority: 10 - prefetch: 10000 - - test-2: - driver: memory - priority: 10 - prefetch: 10000 - - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml deleted file mode 100644 index 05dc3ffa..00000000 --- a/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml +++ /dev/null @@ -1,21 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_err.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml deleted file mode 100644 index 1ad48237..00000000 --- a/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml +++ /dev/null @@ -1,44 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: debug - mode: development - -jobs: - # num logical cores by default - num_pollers: 10 - # 1mi by default - pipeline_size: 100000 - # worker pool configuration - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - # list of broker pipelines associated with endpoints - pipelines: - test-local: - driver: memory - priority: 10 - pipeline_size: 10000 - - test-local-2: - driver: memory - priority: 1 - pipeline_size: 10000 - - test-local-3: - driver: memory - priority: 2 - pipeline_size: 10000 - - # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-local-2" ] - diff --git a/tests/plugins/jobs/sqs/.rr-no-global.yaml b/tests/plugins/jobs/sqs/.rr-no-global.yaml deleted file mode 100644 index 2c97a37e..00000000 --- a/tests/plugins/jobs/sqs/.rr-no-global.yaml +++ /dev/null @@ -1,39 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -logs: - level: error - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: sqs - prefetch: 1000 - visibility_timeout: 0 - wait_time_seconds: 0 - queue: default - attributes: - DelaySeconds: 0 - MaximumMessageSize: 262144 - MessageRetentionPeriod: 345600 - ReceiveMessageWaitTimeSeconds: 0 - VisibilityTimeout: 30 - tags: - test: "tag" - - consume: [ "test-1" ] - diff --git a/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml b/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml deleted file mode 100644 index 21209cbb..00000000 --- a/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml +++ /dev/null @@ -1,29 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_ok.php" - relay: "pipes" - relay_timeout: "20s" - -# amazon sqs configuration -# General section -sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://127.0.0.1:9324 - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 1 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s diff --git a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml deleted file mode 100644 index ffdec1fd..00000000 --- a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml +++ /dev/null @@ -1,54 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../client.php echo pipes" - relay: "pipes" - relay_timeout: "20s" - -sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://127.0.0.1:9324 - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s - - pipelines: - test-1: - driver: sqs - prefetch: 1000 - visibility_timeout: 0 - wait_time_seconds: 0 - queue: default - attributes: - DelaySeconds: 0 - MaximumMessageSize: 262144 - MessageRetentionPeriod: 345600 - ReceiveMessageWaitTimeSeconds: 0 - VisibilityTimeout: 30 - tags: - test: "tag" - - test-2: - driver: sqs - prefetch: 1000 - queue: default-2 - attributes: - MessageRetentionPeriod: 86400 - tags: - test: "tag" - consume: [ "test-1", "test-2" ] - diff --git a/tests/plugins/jobs/sqs/.rr-sqs-jobs-err.yaml b/tests/plugins/jobs/sqs/.rr-sqs-jobs-err.yaml deleted file mode 100644 index b518d433..00000000 --- a/tests/plugins/jobs/sqs/.rr-sqs-jobs-err.yaml +++ /dev/null @@ -1,28 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - -server: - command: "php ../../jobs_err.php" - relay: "pipes" - relay_timeout: "20s" - -sqs: - key: api-key - secret: api-secret - region: us-west-1 - endpoint: http://127.0.0.1:9324 - -logs: - level: debug - encoding: console - mode: development - -jobs: - num_pollers: 10 - timeout: 60 - pipeline_size: 100000 - pool: - num_workers: 10 - max_jobs: 0 - allocate_timeout: 60s - destroy_timeout: 60s |