diff options
Diffstat (limited to 'tests')
24 files changed, 356 insertions, 128 deletions
diff --git a/tests/composer.json b/tests/composer.json index 50178d1f..fa5925b7 100644 --- a/tests/composer.json +++ b/tests/composer.json @@ -2,7 +2,7 @@ "minimum-stability": "beta", "prefer-stable": true, "require": { - "nyholm/psr7": "^1.3", + "nyholm/psr7": "^1.4", "spiral/roadrunner": "^2.0", "spiral/roadrunner-http": "^2.0", "temporal/sdk": ">=1.0", diff --git a/tests/docker-compose-jobs.yml b/tests/docker-compose-jobs.yml new file mode 100644 index 00000000..7b88c9cf --- /dev/null +++ b/tests/docker-compose-jobs.yml @@ -0,0 +1,22 @@ +version: "3" + +services: + beanstalk: + image: schickling/beanstalkd + ports: + - "11300:11300" + + sqs: + image: vsouza/sqs-local + ports: + - "9324:9324" + + rabbitmq: + image: rabbitmq:3-management + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + RABBITMQ_DEFAULT_VHOST: / + ports: + - "15672:15672" + - "5672:5672"
\ No newline at end of file diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index 0ec813f3..d8bedf29 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -176,7 +176,7 @@ func TestBroadcastNoConfig(t *testing.T) { } func TestBroadcastSameSubscriber(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) cfg := &config.Viper{ @@ -189,11 +189,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1) - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).AnyTimes() + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) @@ -279,14 +279,15 @@ func TestBroadcastSameSubscriber(t *testing.T) { t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) - time.Sleep(time.Second * 4) stopCh <- struct{}{} wg.Wait() + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) cfg := &config.Viper{ @@ -299,11 +300,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1) - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).AnyTimes() + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) @@ -389,10 +390,10 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) - time.Sleep(time.Second * 4) stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) } func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml index d8daa251..66114d64 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml index 2ca97055..ea25988c 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" @@ -38,9 +36,4 @@ broadcast: logs: mode: development - level: error - -endure: - grace_period: 120s - print_graph: false - log_level: error + level: info diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml index 360e05e5..cbe18196 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" @@ -35,9 +33,4 @@ broadcast: logs: mode: development - level: debug - -endure: - grace_period: 120s - print_graph: false - log_level: error + level: info diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go index d3b16256..01ad1479 100644 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,14 @@ type Plugin1 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + p.exit = make(chan struct{}, 1) return nil } @@ -39,16 +42,22 @@ func (p *Plugin1) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + } } }() @@ -59,6 +68,8 @@ func (p *Plugin1) Stop() error { _ = p.driver.Unsubscribe("1", "foo") _ = p.driver.Unsubscribe("1", "foo2") _ = p.driver.Unsubscribe("1", "foo3") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go index 2bd819d2..ee072ffe 100644 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,13 @@ type Plugin2 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + exit chan struct{} } func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +40,22 @@ func (p *Plugin2) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + } } }() @@ -56,6 +64,7 @@ func (p *Plugin2) Serve() chan error { func (p *Plugin2) Stop() error { _ = p.driver.Unsubscribe("2", "foo") + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go index ef926222..288201d1 100644 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin3 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin3) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + } } }() @@ -56,6 +66,7 @@ func (p *Plugin3) Serve() chan error { func (p *Plugin3) Stop() error { _ = p.driver.Unsubscribe("3", "foo") + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go index c9b94777..56f79c0f 100644 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin4 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin4) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin4) Serve() chan error { func (p *Plugin4) Stop() error { _ = p.driver.Unsubscribe("4", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go index 01562a8f..e7cd7e60 100644 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin5 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin5) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin5) Serve() chan error { func (p *Plugin5) Stop() error { _ = p.driver.Unsubscribe("5", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go index 76f2d6e8..08272196 100644 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin6 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin6) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin6) Serve() chan error { func (p *Plugin6) Stop() error { _ = p.driver.Unsubscribe("6", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/headers/configs/.rr-cors-headers.yaml b/tests/plugins/headers/configs/.rr-cors-headers.yaml index 9d2ef7e5..b4e960f1 100644 --- a/tests/plugins/headers/configs/.rr-cors-headers.yaml +++ b/tests/plugins/headers/configs/.rr-cors-headers.yaml @@ -1,9 +1,5 @@ server: command: "php ../../http/client.php headers pipes" - user: "" - group: "" - env: - "RR_HTTP": "true" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/http/configs/.rr-env.yaml b/tests/plugins/http/configs/.rr-env.yaml index 99358b04..4ea8ec73 100644 --- a/tests/plugins/http/configs/.rr-env.yaml +++ b/tests/plugins/http/configs/.rr-env.yaml @@ -3,17 +3,13 @@ rpc: server: command: "php ../../http/client.php env pipes" - user: "" - group: "" - env: - "env_key": "ENV_VALUE" relay: "pipes" relay_timeout: "20s" http: address: 127.0.0.1:12084 max_request_size: 1024 - middleware: [ "" ] + middleware: [] env: "RR_HTTP": "true" "env_key": "ENV_VALUE" diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 40e3a720..37d9452c 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -26,7 +26,7 @@ func TestHandler_Echo(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -77,7 +77,7 @@ func TestHandler_Headers(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -138,7 +138,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -198,7 +198,7 @@ func TestHandler_User_Agent(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -258,7 +258,7 @@ func TestHandler_Cookies(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -323,7 +323,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -387,7 +387,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -447,7 +447,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -507,7 +507,7 @@ func TestHandler_FormData_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -580,7 +580,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -653,7 +653,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -725,7 +725,7 @@ func TestHandler_FormData_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -797,7 +797,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -869,7 +869,7 @@ func TestHandler_Multipart_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -983,7 +983,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1097,7 +1097,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1213,7 +1213,7 @@ func TestHandler_Error(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1259,7 +1259,7 @@ func TestHandler_Error2(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1305,7 +1305,7 @@ func TestHandler_Error3(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1364,7 +1364,7 @@ func TestHandler_ResponseDuration(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1425,7 +1425,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1485,7 +1485,7 @@ func TestHandler_ErrorDuration(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1560,7 +1560,7 @@ func TestHandler_IP(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1621,7 +1621,7 @@ func TestHandler_XRealIP(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1687,7 +1687,7 @@ func TestHandler_XForwardedFor(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1752,7 +1752,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index df696668..d02f9eee 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -31,7 +31,7 @@ func TestHandler_Upload_File(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -114,7 +114,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -197,7 +197,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -280,7 +280,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index 43335999..62816d02 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -10,7 +10,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = pool.Config{ +var testPoolConfig = &pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml new file mode 100644 index 00000000..b21f764c --- /dev/null +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -0,0 +1,57 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + relay: "pipes" + relay_timeout: "20s" + +jobs: + # worker pool configuration + pool: + num_workers: 4 + + # rabbitmq and similar servers + amqp: + addr: amqp://guest:guest@localhost:5672/ + + # beanstalk configuration + beanstalk: + addr: tcp://localhost:11300 + + # amazon sqs configuration + sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://localhost:9324 + + # job destinations and options + dispatch: + spiral-jobs-tests-amqp-*.pipeline: amqp + spiral-jobs-tests-local-*.pipeline: local + spiral-jobs-tests-beanstalk-*.pipeline: beanstalk + spiral-jobs-tests-sqs-*.pipeline: sqs + + # list of broker pipelines associated with endpoints + pipelines: + local: + broker: ephemeral + + amqp: + broker: amqp + queue: default + + beanstalk: + broker: beanstalk + tube: default + + sqs: + broker: sqs + queue: default + declare: + MessageRetentionPeriod: 86400 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: ["local", "amqp", "beanstalk", "sqs"] + diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go new file mode 100644 index 00000000..e8b4e83d --- /dev/null +++ b/tests/plugins/jobs/jobs_plugin_test.go @@ -0,0 +1,90 @@ +package jobs + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/brokers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/logger" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/stretchr/testify/assert" +) + +func TestJobsInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-jobs-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + + stopCh <- struct{}{} + + wg.Wait() +} diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go index 61942516..5c26cbd0 100644 --- a/tests/plugins/resetter/test_plugin.go +++ b/tests/plugins/resetter/test_plugin.go @@ -9,7 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = poolImpl.Config{ +var testPoolConfig = &poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go index f1c13734..e813e456 100644 --- a/tests/plugins/server/plugin_pipes.go +++ b/tests/plugins/server/plugin_pipes.go @@ -15,7 +15,7 @@ import ( const ConfigSection = "server" const Response = "test" -var testPoolConfig = pool.Config{ +var testPoolConfig = &pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php index d0c72eae..b4a028d4 100644 --- a/tests/psr-worker-bench.php +++ b/tests/psr-worker-bench.php @@ -56,4 +56,4 @@ if ($env->getMode() === 'http') { } $factory->run(); -}
\ No newline at end of file +} diff --git a/tests/psr-worker.php b/tests/psr-worker.php index db53eee2..de4befbc 100644 --- a/tests/psr-worker.php +++ b/tests/psr-worker.php @@ -20,7 +20,7 @@ while ($req = $psr7->waitRequest()) { try { $resp = new \Nyholm\Psr7\Response(); $resp->getBody()->write(str_repeat("hello world", 1000)); - + $psr7->respond($resp); } catch (\Throwable $e) { $psr7->getWorker()->error((string)$e); diff --git a/tests/worker-cors.php b/tests/worker-cors.php new file mode 100644 index 00000000..ea3c986c --- /dev/null +++ b/tests/worker-cors.php @@ -0,0 +1,15 @@ +<?php + +use Spiral\RoadRunner\Worker; +use Spiral\RoadRunner\Http\HttpWorker; + +ini_set('display_errors', 'stderr'); +require __DIR__ . '/vendor/autoload.php'; + +$http = new HttpWorker(Worker::create()); + +while ($req = $http->waitRequest()) { + $http->respond(200, 'Response', [ + 'Access-Control-Allow-Origin' => ['*'] + ]); +} |