diff options
author | Valery Piashchynski <[email protected]> | 2021-08-12 15:38:19 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-12 15:38:19 +0300 |
commit | df27287c78d7b17d7c8f0e7fff59fa7cbf2a4f9f (patch) | |
tree | df0749155487eae6bcdbb2456885131a21916f4d /tests/plugins | |
parent | 67db4b5f7b66e9a32713133baed83c3ab7146bb8 (diff) | |
parent | ecbfc5c5265a9895f4e371ce4388f64df8714e63 (diff) |
#726: feat(plugin): new `jobs` plugin
#726: feat(plugin): new `jobs` plugin
Diffstat (limited to 'tests/plugins')
74 files changed, 3345 insertions, 246 deletions
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index 0ec813f3..d8bedf29 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -176,7 +176,7 @@ func TestBroadcastNoConfig(t *testing.T) { } func TestBroadcastSameSubscriber(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) cfg := &config.Viper{ @@ -189,11 +189,11 @@ func TestBroadcastSameSubscriber(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1) - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).AnyTimes() + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) @@ -279,14 +279,15 @@ func TestBroadcastSameSubscriber(t *testing.T) { t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) - time.Sleep(time.Second * 4) stopCh <- struct{}{} wg.Wait() + + time.Sleep(time.Second * 5) } func TestBroadcastSameSubscriberGlobal(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second)) assert.NoError(t, err) cfg := &config.Viper{ @@ -299,11 +300,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1) - mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).AnyTimes() + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes() mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) - mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2) mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) @@ -389,10 +390,10 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) { t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) - time.Sleep(time.Second * 4) stopCh <- struct{}{} wg.Wait() + time.Sleep(time.Second * 5) } func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml index d8daa251..66114d64 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml index 2ca97055..5ae5a101 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" @@ -21,7 +19,7 @@ http: redis: addrs: - - "localhost:6379" + - "127.0.0.1:6379" broadcast: test: @@ -29,7 +27,7 @@ broadcast: test2: driver: redis addrs: - - "localhost:6378" + - "127.0.0.1:6378" test3: driver: memory test4: @@ -38,9 +36,4 @@ broadcast: logs: mode: development - level: error - -endure: - grace_period: 120s - print_graph: false - log_level: error + level: info diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml index aa80330e..d8457578 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml @@ -23,7 +23,7 @@ broadcast: default: driver: redis addrs: - - "localhost:6379" + - "127.0.0.1:6379" logs: mode: development diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml index 360e05e5..2337b8fe 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml @@ -3,8 +3,6 @@ rpc: server: command: "php ../../psr-worker-bench.php" - user: "" - group: "" relay: "pipes" relay_timeout: "20s" @@ -23,11 +21,11 @@ broadcast: test: driver: redis addrs: - - "localhost:6379" + - "127.0.0.1:6379" test2: driver: redis addrs: - - "localhost:6378" + - "127.0.0.1:6378" test3: driver: memory test4: @@ -35,9 +33,4 @@ broadcast: logs: mode: development - level: debug - -endure: - grace_period: 120s - print_graph: false - log_level: error + level: info diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go index d3b16256..01ad1479 100644 --- a/tests/plugins/broadcast/plugins/plugin1.go +++ b/tests/plugins/broadcast/plugins/plugin1.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,14 @@ type Plugin1 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + p.exit = make(chan struct{}, 1) return nil } @@ -39,16 +42,22 @@ func (p *Plugin1) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg)) + } } }() @@ -59,6 +68,8 @@ func (p *Plugin1) Stop() error { _ = p.driver.Unsubscribe("1", "foo") _ = p.driver.Unsubscribe("1", "foo2") _ = p.driver.Unsubscribe("1", "foo3") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go index 2bd819d2..ee072ffe 100644 --- a/tests/plugins/broadcast/plugins/plugin2.go +++ b/tests/plugins/broadcast/plugins/plugin2.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,13 @@ type Plugin2 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + exit chan struct{} } func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +40,22 @@ func (p *Plugin2) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg)) + } } }() @@ -56,6 +64,7 @@ func (p *Plugin2) Serve() chan error { func (p *Plugin2) Stop() error { _ = p.driver.Unsubscribe("2", "foo") + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go index ef926222..288201d1 100644 --- a/tests/plugins/broadcast/plugins/plugin3.go +++ b/tests/plugins/broadcast/plugins/plugin3.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin3 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin3) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg)) + } } }() @@ -56,6 +66,7 @@ func (p *Plugin3) Serve() chan error { func (p *Plugin3) Stop() error { _ = p.driver.Unsubscribe("3", "foo") + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go index c9b94777..56f79c0f 100644 --- a/tests/plugins/broadcast/plugins/plugin4.go +++ b/tests/plugins/broadcast/plugins/plugin4.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin4 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin4) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin4) Serve() chan error { func (p *Plugin4) Stop() error { _ = p.driver.Unsubscribe("4", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go index 01562a8f..e7cd7e60 100644 --- a/tests/plugins/broadcast/plugins/plugin5.go +++ b/tests/plugins/broadcast/plugins/plugin5.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin5 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin5) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin5) Serve() chan error { func (p *Plugin5) Stop() error { _ = p.driver.Unsubscribe("5", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go index 76f2d6e8..08272196 100644 --- a/tests/plugins/broadcast/plugins/plugin6.go +++ b/tests/plugins/broadcast/plugins/plugin6.go @@ -3,7 +3,7 @@ package plugins import ( "fmt" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -14,11 +14,15 @@ type Plugin6 struct { log logger.Logger b broadcast.Broadcaster driver pubsub.SubReader + + exit chan struct{} } func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error { p.log = log p.b = b + + p.exit = make(chan struct{}, 1) return nil } @@ -38,16 +42,22 @@ func (p *Plugin6) Serve() chan error { go func() { for { - msg, err := p.driver.Next() - if err != nil { - panic(err) - } + select { + case <-p.exit: + return + default: + msg, err := p.driver.Next() + if err != nil { + errCh <- err + return + } - if msg == nil { - continue - } + if msg == nil { + continue + } - p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg)) + } } }() @@ -56,6 +66,8 @@ func (p *Plugin6) Serve() chan error { func (p *Plugin6) Stop() error { _ = p.driver.Unsubscribe("6", "foo") + + p.exit <- struct{}{} return nil } diff --git a/tests/plugins/config/config_test.go b/tests/plugins/config/config_test.go index b6063cec..87ab1eaa 100755 --- a/tests/plugins/config/config_test.go +++ b/tests/plugins/config/config_test.go @@ -97,7 +97,7 @@ func TestConfigOverwriteValid(t *testing.T) { vp := &config.Viper{} vp.Path = "configs/.rr.yaml" vp.Prefix = "rr" - vp.Flags = []string{"rpc.listen=tcp://localhost:36643"} + vp.Flags = []string{"rpc.listen=tcp://127.0.0.1:36643"} err = container.RegisterAll( &logger.ZapLogger{}, @@ -143,7 +143,7 @@ func TestConfigEnvVariables(t *testing.T) { t.Fatal(err) } - err = os.Setenv("SUPER_RPC_ENV", "tcp://localhost:36643") + err = os.Setenv("SUPER_RPC_ENV", "tcp://127.0.0.1:36643") assert.NoError(t, err) vp := &config.Viper{} @@ -194,7 +194,7 @@ func TestConfigEnvVariablesFail(t *testing.T) { t.Fatal(err) } - err = os.Setenv("SUPER_RPC_ENV", "tcp://localhost:6065") + err = os.Setenv("SUPER_RPC_ENV", "tcp://127.0.0.1:6065") assert.NoError(t, err) vp := &config.Viper{} diff --git a/tests/plugins/config/configs/.rr.yaml b/tests/plugins/config/configs/.rr.yaml index f449dcf3..575cdd33 100755 --- a/tests/plugins/config/configs/.rr.yaml +++ b/tests/plugins/config/configs/.rr.yaml @@ -1,5 +1,5 @@ rpc: - listen: tcp://localhost:6060 + listen: tcp://127.0.0.1:6060 logs: mode: development diff --git a/tests/plugins/config/plugin1.go b/tests/plugins/config/plugin1.go index 1de9a02e..08a48a4f 100755 --- a/tests/plugins/config/plugin1.go +++ b/tests/plugins/config/plugin1.go @@ -83,7 +83,7 @@ func (f *Foo) Serve() chan error { return errCh } - if allCfg.RPC.Listen != "tcp://localhost:6060" { + if allCfg.RPC.Listen != "tcp://127.0.0.1:6060" { errCh <- errors.E(op, errors.Str("RPC.Listen should be parsed")) return errCh } diff --git a/tests/plugins/config/plugin2.go b/tests/plugins/config/plugin2.go index 9639b170..8c6f36c1 100755 --- a/tests/plugins/config/plugin2.go +++ b/tests/plugins/config/plugin2.go @@ -37,7 +37,7 @@ func (f *Foo2) Serve() chan error { return errCh } - if allCfg.RPC.Listen != "tcp://localhost:36643" { + if allCfg.RPC.Listen != "tcp://127.0.0.1:36643" { errCh <- errors.E(op, errors.Str("RPC.Listen should be overwritten")) return errCh } diff --git a/tests/plugins/gzip/plugin_test.go b/tests/plugins/gzip/plugin_test.go index b254fad5..5612ec94 100644 --- a/tests/plugins/gzip/plugin_test.go +++ b/tests/plugins/gzip/plugin_test.go @@ -89,7 +89,7 @@ func TestGzipPlugin(t *testing.T) { } func headerCheck(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:18953", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:18953", nil) assert.NoError(t, err) client := &http.Client{ Transport: &http.Transport{ diff --git a/tests/plugins/headers/configs/.rr-cors-headers.yaml b/tests/plugins/headers/configs/.rr-cors-headers.yaml index 9d2ef7e5..b4e960f1 100644 --- a/tests/plugins/headers/configs/.rr-cors-headers.yaml +++ b/tests/plugins/headers/configs/.rr-cors-headers.yaml @@ -1,9 +1,5 @@ server: command: "php ../../http/client.php headers pipes" - user: "" - group: "" - env: - "RR_HTTP": "true" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/headers/headers_plugin_test.go b/tests/plugins/headers/headers_plugin_test.go index 49d86b00..a03a3c34 100644 --- a/tests/plugins/headers/headers_plugin_test.go +++ b/tests/plugins/headers/headers_plugin_test.go @@ -154,7 +154,7 @@ func TestRequestHeaders(t *testing.T) { } func reqHeaders(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:22655?hello=value", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:22655?hello=value", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -239,7 +239,7 @@ func TestResponseHeaders(t *testing.T) { } func resHeaders(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:22455?hello=value", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:22455?hello=value", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -326,7 +326,7 @@ func TestCORSHeaders(t *testing.T) { } func corsHeadersPass(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:22855", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:22855", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -346,7 +346,7 @@ func corsHeadersPass(t *testing.T) { } func corsHeaders(t *testing.T) { - req, err := http.NewRequest("OPTIONS", "http://localhost:22855", nil) + req, err := http.NewRequest("OPTIONS", "http://127.0.0.1:22855", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) diff --git a/tests/plugins/http/configs/.rr-env.yaml b/tests/plugins/http/configs/.rr-env.yaml index 99358b04..4ea8ec73 100644 --- a/tests/plugins/http/configs/.rr-env.yaml +++ b/tests/plugins/http/configs/.rr-env.yaml @@ -3,17 +3,13 @@ rpc: server: command: "php ../../http/client.php env pipes" - user: "" - group: "" - env: - "env_key": "ENV_VALUE" relay: "pipes" relay_timeout: "20s" http: address: 127.0.0.1:12084 max_request_size: 1024 - middleware: [ "" ] + middleware: [] env: "RR_HTTP": "true" "env_key": "ENV_VALUE" diff --git a/tests/plugins/http/configs/.rr-http.yaml b/tests/plugins/http/configs/.rr-http.yaml index c95bc049..b4910160 100644 --- a/tests/plugins/http/configs/.rr-http.yaml +++ b/tests/plugins/http/configs/.rr-http.yaml @@ -3,10 +3,6 @@ rpc: server: command: "php ../../http/client.php echo pipes" - user: "" - group: "" - env: - "RR_HTTP": "true" relay: "pipes" relay_timeout: "20s" diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 40e3a720..c8709678 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -26,7 +26,7 @@ func TestHandler_Echo(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -56,7 +56,7 @@ func TestHandler_Echo(t *testing.T) { }(hs) time.Sleep(time.Millisecond * 10) - body, r, err := get("http://localhost:8177/?hello=world") + body, r, err := get("http://127.0.0.1:8177/?hello=world") assert.NoError(t, err) defer func() { _ = r.Body.Close() @@ -77,7 +77,7 @@ func TestHandler_Headers(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -111,7 +111,7 @@ func TestHandler_Headers(t *testing.T) { }() time.Sleep(time.Millisecond * 100) - req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:8078?hello=world", nil) assert.NoError(t, err) req.Header.Add("input", "sample") @@ -138,7 +138,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -172,7 +172,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:19658?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:19658?hello=world", nil) assert.NoError(t, err) req.Header.Add("user-agent", "") @@ -198,7 +198,7 @@ func TestHandler_User_Agent(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -232,7 +232,7 @@ func TestHandler_User_Agent(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:25688?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:25688?hello=world", nil) assert.NoError(t, err) req.Header.Add("User-Agent", "go-agent") @@ -258,7 +258,7 @@ func TestHandler_Cookies(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -292,7 +292,7 @@ func TestHandler_Cookies(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("GET", "http://localhost:8079", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:8079", nil) assert.NoError(t, err) req.AddCookie(&http.Cookie{Name: "input", Value: "input-value"}) @@ -323,7 +323,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -359,7 +359,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { req, err := http.NewRequest( "POST", - "http://localhost"+hs.Addr, + "http://127.0.0.1"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`), ) assert.NoError(t, err) @@ -387,7 +387,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -421,7 +421,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) + req, err := http.NewRequest("PUT", "http://127.0.0.1"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) assert.NoError(t, err) req.Header.Add("Content-Type", "application/json") @@ -447,7 +447,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -481,7 +481,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) + req, err := http.NewRequest("PATCH", "http://127.0.0.1"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) assert.NoError(t, err) req.Header.Add("Content-Type", "application/json") @@ -507,7 +507,7 @@ func TestHandler_FormData_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -552,7 +552,7 @@ func TestHandler_FormData_POST(t *testing.T) { form.Add("arr[c]p", "l") form.Add("arr[c]z", "") - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, strings.NewReader(form.Encode())) assert.NoError(t, err) req.Header.Add("Content-Type", "application/x-www-form-urlencoded") @@ -580,7 +580,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -626,7 +626,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { form.Add("arr[c]p", "l") form.Add("arr[c]z", "") - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, strings.NewReader(form.Encode())) assert.NoError(t, err) req.Header.Add("Content-Type", "application/x-www-form-urlencoded") @@ -653,7 +653,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -698,7 +698,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { form.Add("arr[c]p", "l") form.Add("arr[c]z", "") - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, strings.NewReader(form.Encode())) assert.NoError(t, err) req.Header.Add("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8") @@ -725,7 +725,7 @@ func TestHandler_FormData_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -770,7 +770,7 @@ func TestHandler_FormData_PUT(t *testing.T) { form.Add("arr[c]p", "l") form.Add("arr[c]z", "") - req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + req, err := http.NewRequest("PUT", "http://127.0.0.1"+hs.Addr, strings.NewReader(form.Encode())) assert.NoError(t, err) req.Header.Add("Content-Type", "application/x-www-form-urlencoded") @@ -797,7 +797,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -842,7 +842,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { form.Add("arr[c]p", "l") form.Add("arr[c]z", "") - req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + req, err := http.NewRequest("PATCH", "http://127.0.0.1"+hs.Addr, strings.NewReader(form.Encode())) assert.NoError(t, err) req.Header.Add("Content-Type", "application/x-www-form-urlencoded") @@ -869,7 +869,7 @@ func TestHandler_Multipart_POST(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -956,7 +956,7 @@ func TestHandler_Multipart_POST(t *testing.T) { t.Errorf("error closing the writer: error %v", err) } - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, &mb) assert.NoError(t, err) req.Header.Set("Content-Type", w.FormDataContentType()) @@ -983,7 +983,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1070,7 +1070,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { t.Errorf("error closing the writer: error %v", err) } - req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb) + req, err := http.NewRequest("PUT", "http://127.0.0.1"+hs.Addr, &mb) assert.NoError(t, err) req.Header.Set("Content-Type", w.FormDataContentType()) @@ -1097,7 +1097,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1186,7 +1186,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { t.Errorf("error closing the writer: error %v", err) } - req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb) + req, err := http.NewRequest("PATCH", "http://127.0.0.1"+hs.Addr, &mb) assert.NoError(t, err) req.Header.Set("Content-Type", w.FormDataContentType()) @@ -1213,7 +1213,7 @@ func TestHandler_Error(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1247,7 +1247,7 @@ func TestHandler_Error(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - _, r, err := get("http://localhost:8177/?hello=world") + _, r, err := get("http://127.0.0.1:8177/?hello=world") assert.NoError(t, err) defer func() { _ = r.Body.Close() @@ -1259,7 +1259,7 @@ func TestHandler_Error2(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1293,7 +1293,7 @@ func TestHandler_Error2(t *testing.T) { }() time.Sleep(time.Millisecond * 10) - _, r, err := get("http://localhost:8177/?hello=world") + _, r, err := get("http://127.0.0.1:8177/?hello=world") assert.NoError(t, err) defer func() { _ = r.Body.Close() @@ -1305,7 +1305,7 @@ func TestHandler_Error3(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1344,7 +1344,7 @@ func TestHandler_Error3(t *testing.T) { b2.Write([]byte(" ")) } - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, b2) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, b2) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -1364,7 +1364,7 @@ func TestHandler_ResponseDuration(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1409,7 +1409,7 @@ func TestHandler_ResponseDuration(t *testing.T) { } }) - body, r, err := get("http://localhost:8177/?hello=world") + body, r, err := get("http://127.0.0.1:8177/?hello=world") assert.NoError(t, err) defer func() { _ = r.Body.Close() @@ -1425,7 +1425,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1470,7 +1470,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { } }) - body, r, err := get("http://localhost:8177/?hello=world") + body, r, err := get("http://127.0.0.1:8177/?hello=world") assert.NoError(t, err) defer func() { _ = r.Body.Close() @@ -1485,7 +1485,7 @@ func TestHandler_ErrorDuration(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1530,7 +1530,7 @@ func TestHandler_ErrorDuration(t *testing.T) { } }) - _, r, err := get("http://localhost:8177/?hello=world") + _, r, err := get("http://127.0.0.1:8177/?hello=world") assert.NoError(t, err) defer func() { _ = r.Body.Close() @@ -1560,7 +1560,7 @@ func TestHandler_IP(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1621,7 +1621,7 @@ func TestHandler_XRealIP(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1687,7 +1687,7 @@ func TestHandler_XForwardedFor(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1752,7 +1752,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { p, err := pool.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), - pool.Config{ + &pool.Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -1838,7 +1838,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { b.ReportAllocs() bb := "WORLD" for n := 0; n < b.N; n++ { - r, err := http.Get("http://localhost:8177/?hello=world") + r, err := http.Get("http://127.0.0.1:8177/?hello=world") if err != nil { b.Fail() } diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index c3949911..db62781f 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -251,7 +251,7 @@ func TestHTTPInformerReset(t *testing.T) { } func echoHTTP(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:10084?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:10084?hello=world", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -371,7 +371,7 @@ func TestSSL(t *testing.T) { } func sslNoRedirect(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:8085?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:8085?hello=world", nil) assert.NoError(t, err) r, err := sslClient.Do(req) @@ -393,7 +393,7 @@ func sslNoRedirect(t *testing.T) { } func sslEcho(t *testing.T) { - req, err := http.NewRequest("GET", "https://localhost:8893?hello=world", nil) + req, err := http.NewRequest("GET", "https://127.0.0.1:8893?hello=world", nil) assert.NoError(t, err) r, err := sslClient.Do(req) @@ -505,7 +505,7 @@ func TestSSLRedirect(t *testing.T) { } func sslRedirect(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:8087?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:8087?hello=world", nil) assert.NoError(t, err) r, err := sslClient.Do(req) @@ -593,7 +593,7 @@ func TestSSLPushPipes(t *testing.T) { } func sslPush(t *testing.T) { - req, err := http.NewRequest("GET", "https://localhost:8894?hello=world", nil) + req, err := http.NewRequest("GET", "https://127.0.0.1:8894?hello=world", nil) assert.NoError(t, err) r, err := sslClient.Do(req) @@ -604,7 +604,7 @@ func sslPush(t *testing.T) { b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) - assert.Equal(t, "", r.Header.Get("Http2-Push")) + assert.Equal(t, "", r.Header.Get("Http2-Release")) assert.NoError(t, err) assert.Equal(t, 201, r.StatusCode) @@ -864,7 +864,7 @@ func TestH2CUpgrade(t *testing.T) { } func h2cUpgrade(t *testing.T) { - req, err := http.NewRequest("PRI", "http://localhost:8083?hello=world", nil) + req, err := http.NewRequest("PRI", "http://127.0.0.1:8083?hello=world", nil) if err != nil { t.Fatal(err) } @@ -955,7 +955,7 @@ func TestH2C(t *testing.T) { } func h2c(t *testing.T) { - req, err := http.NewRequest("PRI", "http://localhost:8083?hello=world", nil) + req, err := http.NewRequest("PRI", "http://127.0.0.1:8083?hello=world", nil) if err != nil { t.Fatal(err) } @@ -1047,7 +1047,7 @@ func TestHttpMiddleware(t *testing.T) { } func middleware(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:18903?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:18903?hello=world", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -1062,7 +1062,7 @@ func middleware(t *testing.T) { err = r.Body.Close() assert.NoError(t, err) - req, err = http.NewRequest("GET", "http://localhost:18903/halt", nil) + req, err = http.NewRequest("GET", "http://127.0.0.1:18903/halt", nil) assert.NoError(t, err) r, err = http.DefaultClient.Do(req) @@ -1127,7 +1127,7 @@ logs: mockLogger.EXPECT().Debug(gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Debug("201 GET http://localhost:34999/?hello=world", "remote", "127.0.0.1", "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("201 GET http://127.0.0.1:34999/?hello=world", "remote", "127.0.0.1", "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("WORLD").MinTimes(1) mockLogger.EXPECT().Debug("worker event received", "event", events.EventWorkerLog, "worker state", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() // placeholder for the workerlogerror @@ -1193,7 +1193,7 @@ logs: } func echoError(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:34999?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:34999?hello=world", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -1278,7 +1278,7 @@ func TestHttpEnvVariables(t *testing.T) { } func envVarsTest(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:12084", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:12084", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -1432,7 +1432,7 @@ func TestHTTPSupervisedPool(t *testing.T) { } func echoHTTP2(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:18888?hello=world", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:18888?hello=world", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -1638,7 +1638,7 @@ func bigEchoHTTP(t *testing.T) { bt := bytes.NewBuffer(buf) - req, err := http.NewRequest("GET", "http://localhost:10085?hello=world", bt) + req, err := http.NewRequest("GET", "http://127.0.0.1:10085?hello=world", bt) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -1723,7 +1723,7 @@ func TestStaticEtagPlugin(t *testing.T) { func serveStaticSampleEtag(t *testing.T) { // OK 200 response - b, r, err := get("http://localhost:21603/sample.txt") + b, r, err := get("http://127.0.0.1:21603/sample.txt") assert.NoError(t, err) assert.Equal(t, "sample\n", b) assert.Equal(t, r.StatusCode, http.StatusOK) @@ -1736,7 +1736,7 @@ func serveStaticSampleEtag(t *testing.T) { Timeout: time.Second * 5, } - parsedURL, _ := url.Parse("http://localhost:21603/sample.txt") + parsedURL, _ := url.Parse("http://127.0.0.1:21603/sample.txt") req := &http.Request{ Method: http.MethodGet, @@ -1828,7 +1828,7 @@ func serveStaticSampleNotAllowedPath(t *testing.T) { parsedURL := &url.URL{ Scheme: "http", User: nil, - Host: "localhost:21603", + Host: "127.0.0.1:21603", Path: "%2e%2e%/tests/", } @@ -1845,7 +1845,7 @@ func serveStaticSampleNotAllowedPath(t *testing.T) { parsedURL = &url.URL{ Scheme: "http", User: nil, - Host: "localhost:21603", + Host: "127.0.0.1:21603", Path: "%2e%2e%5ctests/", } @@ -1862,7 +1862,7 @@ func serveStaticSampleNotAllowedPath(t *testing.T) { parsedURL = &url.URL{ Scheme: "http", User: nil, - Host: "localhost:21603", + Host: "127.0.0.1:21603", Path: "..%2ftests/", } @@ -1879,7 +1879,7 @@ func serveStaticSampleNotAllowedPath(t *testing.T) { parsedURL = &url.URL{ Scheme: "http", User: nil, - Host: "localhost:21603", + Host: "127.0.0.1:21603", Path: "%2e%2e%2ftests/", } @@ -1893,7 +1893,7 @@ func serveStaticSampleNotAllowedPath(t *testing.T) { assert.Equal(t, http.StatusBadRequest, resp.StatusCode) _ = resp.Body.Close() - _, r, err := get("http://localhost:21603/../../../../tests/../static/sample.txt") + _, r, err := get("http://127.0.0.1:21603/../../../../tests/../static/sample.txt") assert.NoError(t, err) assert.Equal(t, 403, r.StatusCode) _ = r.Body.Close() @@ -1971,7 +1971,7 @@ func TestStaticPlugin(t *testing.T) { } func staticHeaders(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:21603/client.php", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:21603/client.php", nil) if err != nil { t.Fatal(err) } @@ -1999,7 +1999,7 @@ func staticHeaders(t *testing.T) { } func staticNotForbid(t *testing.T) { - b, r, err := get("http://localhost:21603/client.php") + b, r, err := get("http://127.0.0.1:21603/client.php") assert.NoError(t, err) assert.Equal(t, all("../../../tests/client.php"), b) assert.Equal(t, all("../../../tests/client.php"), b) @@ -2007,7 +2007,7 @@ func staticNotForbid(t *testing.T) { } func serveStaticSample(t *testing.T) { - b, r, err := get("http://localhost:21603/sample.txt") + b, r, err := get("http://127.0.0.1:21603/sample.txt") assert.NoError(t, err) assert.Equal(t, "sample\n", b) _ = r.Body.Close() @@ -2104,7 +2104,7 @@ func TestStaticFilesDisabled(t *testing.T) { } func staticFilesDisabled(t *testing.T) { - b, r, err := get("http://localhost:45877/client.php?hello=world") + b, r, err := get("http://127.0.0.1:45877/client.php?hello=world") if err != nil { t.Fatal(err) } @@ -2126,9 +2126,9 @@ func TestStaticFilesForbid(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("201 GET http://localhost:34653/http?hello=world", "remote", "127.0.0.1", "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Debug("201 GET http://localhost:34653/client.XXX?hello=world", "remote", "127.0.0.1", "elapsed", gomock.Any()).MinTimes(1) - mockLogger.EXPECT().Debug("201 GET http://localhost:34653/client.php?hello=world", "remote", "127.0.0.1", "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("201 GET http://127.0.0.1:34653/http?hello=world", "remote", "127.0.0.1", "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("201 GET http://127.0.0.1:34653/client.XXX?hello=world", "remote", "127.0.0.1", "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("201 GET http://127.0.0.1:34653/client.php?hello=world", "remote", "127.0.0.1", "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Error("file open error", "error", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("no such file or directory", "error", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("possible path to dir provided").AnyTimes() @@ -2198,19 +2198,19 @@ func TestStaticFilesForbid(t *testing.T) { } func staticTestFilesDir(t *testing.T) { - b, r, err := get("http://localhost:34653/http?hello=world") + b, r, err := get("http://127.0.0.1:34653/http?hello=world") assert.NoError(t, err) assert.Equal(t, "WORLD", b) _ = r.Body.Close() } func staticNotFound(t *testing.T) { - b, _, _ := get("http://localhost:34653/client.XXX?hello=world") //nolint:bodyclose + b, _, _ := get("http://127.0.0.1:34653/client.XXX?hello=world") //nolint:bodyclose assert.Equal(t, "WORLD", b) } func staticFilesForbid(t *testing.T) { - b, r, err := get("http://localhost:34653/client.php?hello=world") + b, r, err := get("http://127.0.0.1:34653/client.php?hello=world") if err != nil { t.Fatal(err) } @@ -2288,7 +2288,7 @@ func TestHTTPIssue659(t *testing.T) { } func echoIssue659(t *testing.T) { - req, err := http.NewRequest(http.MethodGet, "http://localhost:32552", nil) + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:32552", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go index 276c22ef..f754429d 100644 --- a/tests/plugins/http/response_test.go +++ b/tests/plugins/http/response_test.go @@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{Context: []byte(`invalid payload`)}) + r, err := handler.NewResponse(&payload.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -93,7 +93,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -114,7 +114,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -129,7 +129,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -148,7 +148,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler.NewResponse(&payload.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index df696668..54f2bead 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -31,7 +31,7 @@ func TestHandler_Upload_File(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -85,7 +85,7 @@ func TestHandler_Upload_File(t *testing.T) { t.Errorf("error closing the file: error %v", err) } - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, &mb) assert.NoError(t, err) req.Header.Set("Content-Type", w.FormDataContentType()) @@ -114,7 +114,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -168,7 +168,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { t.Errorf("error closing the file: error %v", err) } - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, &mb) assert.NoError(t, err) req.Header.Set("Content-Type", w.FormDataContentType()) @@ -197,7 +197,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -251,7 +251,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { t.Errorf("error closing the file: error %v", err) } - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, &mb) assert.NoError(t, err) req.Header.Set("Content-Type", w.FormDataContentType()) @@ -280,7 +280,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { pool, err := poolImpl.Initialize(context.Background(), func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") }, pipe.NewPipeFactory(), - poolImpl.Config{ + &poolImpl.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, @@ -334,7 +334,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { t.Errorf("error closing the file: error %v", err) } - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + req, err := http.NewRequest("POST", "http://127.0.0.1"+hs.Addr, &mb) assert.NoError(t, err) req.Header.Set("Content-Type", w.FormDataContentType()) diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index 43335999..095140b8 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -10,7 +10,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = pool.Config{ +var testPoolConfig = &pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, @@ -51,13 +51,13 @@ func (p1 *Plugin1) Name() string { func (p1 *Plugin1) Available() {} -func (p1 *Plugin1) Workers() []process.State { +func (p1 *Plugin1) Workers() []*process.State { p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) if err != nil { panic(err) } - ps := make([]process.State, 0, len(p.Workers())) + ps := make([]*process.State, 0, len(p.Workers())) workers := p.Workers() for i := 0; i < len(workers); i++ { state, err := process.WorkerProcessState(workers[i]) diff --git a/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml new file mode 100644 index 00000000..f9a7308b --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-amqp-declare.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:[email protected]:5672/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/amqp/.rr-amqp-init.yaml b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml new file mode 100644 index 00000000..43840545 --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-amqp-init.yaml @@ -0,0 +1,55 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:[email protected]:5672/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + timeout: 1 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: amqp + prefetch: 100 + queue: test-1-queue + priority: 1 + exchange: default + exchange_type: direct + routing_key: test-1 + exclusive: false + multiple_ack: false + requeue_on_fail: false + + test-2: + driver: amqp + prefetch: 100 + queue: test-2-queue + priority: 2 + exchange: default + exchange_type: direct + routing_key: test-2 + exclusive: false + multiple_ack: false + requeue_on_fail: false + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml new file mode 100644 index 00000000..79493d96 --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-amqp-jobs-err.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:[email protected]:5672/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/amqp/.rr-no-global.yaml b/tests/plugins/jobs/amqp/.rr-no-global.yaml new file mode 100644 index 00000000..1b01eb73 --- /dev/null +++ b/tests/plugins/jobs/amqp/.rr-no-global.yaml @@ -0,0 +1,47 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: amqp + priority: 1 + pipeline_size: 100 + queue: test-1-queue + exchange: default + exchange_type: direct + routing_key: test + + test-2: + driver: amqp + priority: 2 + pipeline_size: 100 + queue: test-2-queue + exchange: default + exchange_type: direct + routing_key: test-2 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml new file mode 100644 index 00000000..3555ef96 --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-declare.yaml @@ -0,0 +1,27 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +beanstalk: + # beanstalk address + addr: tcp://127.0.0.1:11300 + # connect timeout + timeout: 10s + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml new file mode 100644 index 00000000..cf9069a8 --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-init.yaml @@ -0,0 +1,45 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +beanstalk: + addr: tcp://127.0.0.1:11300 + timeout: 10s + +logs: + level: info + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: beanstalk + priority: 11 + tube_priority: 1 + tube: default-1 + reserve_timeout: 10s + + test-2: + driver: beanstalk + priority: 11 + tube_priority: 3 + tube: default-2 + reserve_timeout: 10s + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml new file mode 100644 index 00000000..a4f31290 --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml @@ -0,0 +1,27 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +beanstalk: + # beanstalk address + addr: tcp://127.0.0.1:11300 + # connect timeout + timeout: 10s + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml new file mode 100644 index 00000000..87f46069 --- /dev/null +++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml @@ -0,0 +1,31 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: beanstalk + priority: 11 + tube_priority: 1 + tube: default-1 + reserve_timeout: 10s + + consume: [ "test-1" ] diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml new file mode 100644 index 00000000..bf9f60cc --- /dev/null +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -0,0 +1,112 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:[email protected]:5672/ + +# beanstalk configuration +# +beanstalk: + # beanstalk address + addr: tcp://127.0.0.1:11300 + # connect timeout + timeout: 10s + +# amazon sqs configuration +# General section +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://127.0.0.1:9324 + +logs: + level: info + encoding: console + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-local: + driver: ephemeral + priority: 10 + prefetch: 10000 + + test-local-2: + driver: ephemeral + priority: 1 + prefetch: 10000 + + test-local-3: + driver: ephemeral + priority: 2 + prefetch: 10000 + + test-1: + driver: amqp + # QoS + prefetch: 1000000 + # Queue name + queue: test-1-queue + # Pipeline jobs priority, 1 - highest + priority: 1 + # Exchange + exchange: default + # Exchange type: direct, topic, fanout + exchange_type: direct + # Routing key for the queue + routing_key: test + # Declare a queue exclusive at the exchange + exclusive: false + # When multiple is true, this delivery and all prior unacknowledged deliveries + # on the same channel will be acknowledged. This is useful for batch processing + # of deliveries + multiple_ack: false + # When multiple is true, this delivery and all prior unacknowledged deliveries + # on the same channel will be acknowledged. This is useful for batch processing + # of deliveries + requeue_on_fail: false + + test-2-amqp: + driver: amqp + priority: 2 + prefetch: 1000000 + queue: test-2-queue + exchange: default + exchange_type: direct + routing_key: test-2 + + test-2: + driver: beanstalk + priority: 11 + tube: default + + test-3: + driver: sqs + prefetch: 1000000 + queue: default + attributes: + MessageRetentionPeriod: 86400 + tags: + test: "tag" + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ] + diff --git a/tests/plugins/jobs/durability/.rr-amqp-durability-redial.yaml b/tests/plugins/jobs/durability/.rr-amqp-durability-redial.yaml new file mode 100644 index 00000000..861f7ec4 --- /dev/null +++ b/tests/plugins/jobs/durability/.rr-amqp-durability-redial.yaml @@ -0,0 +1,55 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +amqp: + addr: amqp://guest:[email protected]:23679/ + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + timeout: 1 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: amqp + prefetch: 100 + queue: test-1-queue + priority: 1 + exchange: default + exchange_type: direct + routing_key: test-1 + exclusive: false + multiple_ack: false + requeue_on_fail: false + + test-2: + driver: amqp + prefetch: 100 + queue: test-2-queue + priority: 2 + exchange: default + exchange_type: direct + routing_key: test-2 + exclusive: false + multiple_ack: false + requeue_on_fail: false + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/durability/.rr-beanstalk-durability-redial.yaml b/tests/plugins/jobs/durability/.rr-beanstalk-durability-redial.yaml new file mode 100644 index 00000000..57d8ad2d --- /dev/null +++ b/tests/plugins/jobs/durability/.rr-beanstalk-durability-redial.yaml @@ -0,0 +1,44 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +beanstalk: + addr: tcp://127.0.0.1:11400 + timeout: 10s + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: beanstalk + priority: 11 + tube_priority: 1 + tube: default-1 + reserve_timeout: 10s + + test-2: + driver: beanstalk + priority: 11 + tube_priority: 3 + tube: default-2 + reserve_timeout: 10s + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml b/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml new file mode 100644 index 00000000..b6ba83a4 --- /dev/null +++ b/tests/plugins/jobs/durability/.rr-sqs-durability-redial.yaml @@ -0,0 +1,60 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://127.0.0.1:19324 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + timeout: 20 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: sqs + prefetch: 10 + visibility_timeout: 0 + wait_time_seconds: 1 + queue: default + # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html + attributes: + DelaySeconds: 0 + MaximumMessageSize: 262144 + MessageRetentionPeriod: 345600 + ReceiveMessageWaitTimeSeconds: 0 + VisibilityTimeout: 0 + tags: + test: "tag" + + test-2: + driver: sqs + prefetch: 10 + queue: default-2 + wait_time_seconds: 1 + attributes: + MessageRetentionPeriod: 86400 + tags: + test: "tag" + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml new file mode 100644 index 00000000..726c24ac --- /dev/null +++ b/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml @@ -0,0 +1,21 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml new file mode 100644 index 00000000..8914dfaa --- /dev/null +++ b/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: ephemeral + priority: 10 + prefetch: 10000 + + test-2: + driver: ephemeral + priority: 10 + prefetch: 10000 + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml new file mode 100644 index 00000000..05dc3ffa --- /dev/null +++ b/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml @@ -0,0 +1,21 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml new file mode 100644 index 00000000..e1b76263 --- /dev/null +++ b/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml @@ -0,0 +1,44 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: debug + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-local: + driver: ephemeral + priority: 10 + pipeline_size: 10000 + + test-local-2: + driver: ephemeral + priority: 1 + pipeline_size: 10000 + + test-local-3: + driver: ephemeral + priority: 2 + pipeline_size: 10000 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-local", "test-local-2" ] + diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go new file mode 100644 index 00000000..4c2f2fea --- /dev/null +++ b/tests/plugins/jobs/helpers.go @@ -0,0 +1,185 @@ +package jobs + +import ( + "bytes" + "net" + "net/http" + "net/rpc" + "testing" + + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + push string = "jobs.Push" + pause string = "jobs.Pause" + destroy string = "jobs.Destroy" + resume string = "jobs.Resume" +) + +func resumePipes(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call(resume, pipe, er) + assert.NoError(t, err) + } +} + +func pushToDisabledPipe(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: nil, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + assert.Error(t, err) + } +} + +func pushToPipe(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: 0, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + assert.NoError(t, err) + } +} + +func pushToPipeErr(pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ + Job: "some/php/namespace", + Id: "1", + Payload: `{"hello":"world"}`, + Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, + Options: &jobsv1beta.Options{ + Priority: 1, + Pipeline: pipeline, + Delay: 0, + }, + }} + + er := &jobsv1beta.Empty{} + err = client.Call(push, req, er) + require.Error(t, err) + } +} +func pausePipelines(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call(pause, pipe, er) + assert.NoError(t, err) + } +} + +func destroyPipelines(pipes ...string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, len(pipes))} + + for i := 0; i < len(pipes); i++ { + pipe.GetPipelines()[i] = pipes[i] + } + + er := &jobsv1beta.Empty{} + err = client.Call(destroy, pipe, er) + assert.NoError(t, err) + } +} + +func enableProxy(name string, t *testing.T) { + buf := new(bytes.Buffer) + buf.WriteString(`{"enabled":true}`) + + resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + if resp.Body != nil { + _ = resp.Body.Close() + } +} + +func disableProxy(name string, t *testing.T) { + buf := new(bytes.Buffer) + buf.WriteString(`{"enabled":false}`) + + resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + if resp.Body != nil { + _ = resp.Body.Close() + } +} + +func deleteProxy(name string, t *testing.T) { + client := &http.Client{} + + req, err := http.NewRequest(http.MethodDelete, "http://127.0.0.1:8474/proxies/"+name, nil) //nolint:noctx + require.NoError(t, err) + + resp, err := client.Do(req) + require.NoError(t, err) + + require.NoError(t, err) + require.Equal(t, 204, resp.StatusCode) + if resp.Body != nil { + _ = resp.Body.Close() + } +} diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go new file mode 100644 index 00000000..bb5281c0 --- /dev/null +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -0,0 +1,369 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAMQPInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestAMQPDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestAMQPJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-amqp-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareAMQPPipeline", declareAMQPPipe) + t.Run("ConsumeAMQPPipeline", resumePipes("test-3")) + t.Run("PushAMQPPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PauseAMQPPipeline", pausePipelines("test-3")) + t.Run("DestroyAMQPPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareAMQPPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "amqp", + "name": "test-3", + "routing-key": "test-3", + "queue": "default", + "exchange-type": "direct", + "exchange": "amqp.default", + "prefetch": "100", + "priority": "3", + "exclusive": "true", + "multiple_ask": "true", + "requeue_on_fail": "true", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} + +func TestAMQPNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "amqp/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go new file mode 100644 index 00000000..916ac08f --- /dev/null +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -0,0 +1,372 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBeanstalkInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestBeanstalkDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) + t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) + t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestBeanstalkJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-beanstalk-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver initialized", "driver", "beanstalk", "start", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes() + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes() + + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareBeanstalkPipeline", declareBeanstalkPipe) + t.Run("ConsumeBeanstalkPipeline", resumePipes("test-3")) + t.Run("PushBeanstalkPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PauseBeanstalkPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyBeanstalkPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestBeanstalkNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "beanstalk/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + +func declareBeanstalkPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "beanstalk", + "name": "test-3", + "tube": "default", + "reserve_timeout": "1", + "priority": "3", + "tube_priority": "10", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go new file mode 100644 index 00000000..0a882556 --- /dev/null +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -0,0 +1,424 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestEphemeralInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestEphemeralDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + + time.Sleep(time.Second * 5) +} + +func TestEphemeralPauseResume(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-pause-resume.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("ephemeralPause", pausePipelines("test-local")) + t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) + t.Run("ephemeralResume", resumePipes("test-local")) + t.Run("pushToEnabledPipe", pushToPipe("test-local")) + + time.Sleep(time.Second * 1) + + stopCh <- struct{}{} + wg.Wait() +} + +func TestEphemeralJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "ephemeral/.rr-ephemeral-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) + t.Run("ConsumeEphemeralPipeline", resumePipes("test-3")) + t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func declareEphemeralPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "ephemeral", + "name": "test-3", + "prefetch": "10000", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} + +func consumeEphemeralPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.Pipelines{Pipelines: make([]string, 1)} + pipe.GetPipelines()[0] = "test-3" + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Resume", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go new file mode 100644 index 00000000..829fd102 --- /dev/null +++ b/tests/plugins/jobs/jobs_general_test.go @@ -0,0 +1,125 @@ +package jobs + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" +) + +func TestJobsInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-jobs-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + // mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &ephemeral.Plugin{}, + &amqp.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go new file mode 100644 index 00000000..d4cb4e52 --- /dev/null +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -0,0 +1,365 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSQSInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-sqs-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "sqs", "start", gomock.Any()).Times(2) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("sqs listener stopped").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() +} + +func TestSQSDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-sqs-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("sqs listener stopped").Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareSQSPipeline", declareSQSPipe) + t.Run("ConsumeSQSPipeline", resumePipes("test-3")) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PauseSQSPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroySQSPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestSQSJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-sqs-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("sqs listener stopped").Times(1) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclareSQSPipeline", declareSQSPipe) + t.Run("ConsumeSQSPipeline", resumePipes("test-3")) + t.Run("PushSQSPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PauseSQSPipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroySQSPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + + time.Sleep(time.Second * 5) +} + +func TestSQSNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "sqs/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + +func declareSQSPipe(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "sqs", + "name": "test-3", + "queue": "default", + "prefetch": "10", + "priority": "3", + "visibility_timeout": "0", + "wait_time_seconds": "3", + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) +} diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go new file mode 100644 index 00000000..71986db3 --- /dev/null +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -0,0 +1,396 @@ +package jobs + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + toxiproxy "github.com/Shopify/toxiproxy/client" + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" + "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDurabilityAMQP(t *testing.T) { + client := toxiproxy.NewClient("127.0.0.1:8474") + + _, err := client.CreateProxy("redial", "127.0.0.1:23679", "127.0.0.1:5672") + require.NoError(t, err) + defer deleteProxy("redial", t) + + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + require.NoError(t, err) + + cfg := &config.Viper{ + Path: "durability/.rr-amqp-durability-redial.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(4) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4) + + // redial errors + mockLogger.EXPECT().Warn("rabbitmq reconnecting, caused by", "error", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("rabbitmq dial succeed. trying to redeclare queues and subscribers").AnyTimes() + mockLogger.EXPECT().Info("queues and subscribers redeclared successfully").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + // mockLogger, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &amqp.Plugin{}, + ) + require.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + disableProxy("redial", t) + time.Sleep(time.Second * 3) + + go func() { + time.Sleep(time.Second * 5) + enableProxy("redial", t) + }() + + t.Run("PushPipelineWhileRedialing-1", pushToPipeErr("test-1")) + t.Run("PushPipelineWhileRedialing-2", pushToPipeErr("test-2")) + + time.Sleep(time.Second * 15) + t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) + t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) + + time.Sleep(time.Second * 5) + + stopCh <- struct{}{} + wg.Wait() +} + +func TestDurabilitySQS(t *testing.T) { + client := toxiproxy.NewClient("127.0.0.1:8474") + + _, err := client.CreateProxy("redial", "127.0.0.1:19324", "127.0.0.1:9324") + require.NoError(t, err) + defer deleteProxy("redial", t) + + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + require.NoError(t, err) + + cfg := &config.Viper{ + Path: "durability/.rr-sqs-durability-redial.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(4) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4) + + // redial errors + mockLogger.EXPECT().Warn("rabbitmq reconnecting, caused by", "error", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("rabbitmq dial succeed. trying to redeclare queues and subscribers").AnyTimes() + mockLogger.EXPECT().Info("queues and subscribers redeclared successfully").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + // mockLogger, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &sqs.Plugin{}, + ) + require.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + disableProxy("redial", t) + time.Sleep(time.Second * 3) + + go func() { + time.Sleep(time.Second * 2) + t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) + t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) + }() + + time.Sleep(time.Second * 5) + enableProxy("redial", t) + + t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) + t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) + + time.Sleep(time.Second * 10) + + stopCh <- struct{}{} + wg.Wait() +} + +func TestDurabilityBeanstalk(t *testing.T) { + client := toxiproxy.NewClient("127.0.0.1:8474") + + _, err := client.CreateProxy("redial", "127.0.0.1:11400", "127.0.0.1:11300") + require.NoError(t, err) + defer deleteProxy("redial", t) + + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + require.NoError(t, err) + + cfg := &config.Viper{ + Path: "durability/.rr-beanstalk-durability-redial.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(4) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4) + + // redial errors + mockLogger.EXPECT().Warn("rabbitmq reconnecting, caused by", "error", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("rabbitmq dial succeed. trying to redeclare queues and subscribers").AnyTimes() + mockLogger.EXPECT().Info("queues and subscribers redeclared successfully").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + // mockLogger, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &beanstalk.Plugin{}, + ) + require.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + disableProxy("redial", t) + time.Sleep(time.Second * 3) + + go func() { + time.Sleep(time.Second * 2) + t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) + t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) + }() + + time.Sleep(time.Second * 5) + enableProxy("redial", t) + + t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1")) + t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2")) + + time.Sleep(time.Second * 10) + + stopCh <- struct{}{} + wg.Wait() +} diff --git a/tests/plugins/jobs/sqs/.rr-no-global.yaml b/tests/plugins/jobs/sqs/.rr-no-global.yaml new file mode 100644 index 00000000..2c97a37e --- /dev/null +++ b/tests/plugins/jobs/sqs/.rr-no-global.yaml @@ -0,0 +1,39 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: sqs + prefetch: 1000 + visibility_timeout: 0 + wait_time_seconds: 0 + queue: default + attributes: + DelaySeconds: 0 + MaximumMessageSize: 262144 + MessageRetentionPeriod: 345600 + ReceiveMessageWaitTimeSeconds: 0 + VisibilityTimeout: 30 + tags: + test: "tag" + + consume: [ "test-1" ] + diff --git a/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml b/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml new file mode 100644 index 00000000..21209cbb --- /dev/null +++ b/tests/plugins/jobs/sqs/.rr-sqs-declare.yaml @@ -0,0 +1,29 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +# amazon sqs configuration +# General section +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://127.0.0.1:9324 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml new file mode 100644 index 00000000..ffdec1fd --- /dev/null +++ b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml @@ -0,0 +1,54 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://127.0.0.1:9324 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: sqs + prefetch: 1000 + visibility_timeout: 0 + wait_time_seconds: 0 + queue: default + attributes: + DelaySeconds: 0 + MaximumMessageSize: 262144 + MessageRetentionPeriod: 345600 + ReceiveMessageWaitTimeSeconds: 0 + VisibilityTimeout: 30 + tags: + test: "tag" + + test-2: + driver: sqs + prefetch: 1000 + queue: default-2 + attributes: + MessageRetentionPeriod: 86400 + tags: + test: "tag" + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/sqs/.rr-sqs-jobs-err.yaml b/tests/plugins/jobs/sqs/.rr-sqs-jobs-err.yaml new file mode 100644 index 00000000..b518d433 --- /dev/null +++ b/tests/plugins/jobs/sqs/.rr-sqs-jobs-err.yaml @@ -0,0 +1,28 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +sqs: + key: api-key + secret: api-secret + region: us-west-1 + endpoint: http://127.0.0.1:9324 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + timeout: 60 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml index 34e22a4e..a13b591c 100644 --- a/tests/plugins/kv/configs/.rr-kv-init.yaml +++ b/tests/plugins/kv/configs/.rr-kv-init.yaml @@ -24,7 +24,7 @@ kv: memcached: driver: memcached - addr: [ "localhost:11211" ] + addr: [ "127.0.0.1:11211" ] # redis: # driver: redis diff --git a/tests/plugins/kv/configs/.rr-memcached.yaml b/tests/plugins/kv/configs/.rr-memcached.yaml index 68443bc4..da5d59c6 100644 --- a/tests/plugins/kv/configs/.rr-memcached.yaml +++ b/tests/plugins/kv/configs/.rr-memcached.yaml @@ -9,4 +9,4 @@ kv: memcached-rr: driver: memcached addr: - - "localhost:11211" + - "127.0.0.1:11211" diff --git a/tests/plugins/kv/configs/.rr-redis-global.yaml b/tests/plugins/kv/configs/.rr-redis-global.yaml index d2e8aefe..a4979879 100644 --- a/tests/plugins/kv/configs/.rr-redis-global.yaml +++ b/tests/plugins/kv/configs/.rr-redis-global.yaml @@ -7,7 +7,7 @@ logs: redis-rr: addrs: - - 'localhost:6379' + - '127.0.0.1:6379' kv: redis-rr: diff --git a/tests/plugins/kv/configs/.rr-redis.yaml b/tests/plugins/kv/configs/.rr-redis.yaml index 0a7396ca..522e365a 100644 --- a/tests/plugins/kv/configs/.rr-redis.yaml +++ b/tests/plugins/kv/configs/.rr-redis.yaml @@ -9,4 +9,4 @@ kv: redis-rr: driver: redis addrs: - - 'localhost:6379' + - '127.0.0.1:6379' diff --git a/tests/plugins/logger/logger_test.go b/tests/plugins/logger/logger_test.go index 9e3fa4da..ec4a748d 100644 --- a/tests/plugins/logger/logger_test.go +++ b/tests/plugins/logger/logger_test.go @@ -347,7 +347,7 @@ func TestFileLogger(t *testing.T) { } func httpEcho(t *testing.T) { - req, err := http.NewRequest(http.MethodGet, "http://localhost:54224?hello=world", nil) + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:54224?hello=world", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) diff --git a/tests/plugins/metrics/configs/.rr-http-metrics.yaml b/tests/plugins/metrics/configs/.rr-http-metrics.yaml index 95f131c0..3e92a88c 100644 --- a/tests/plugins/metrics/configs/.rr-http-metrics.yaml +++ b/tests/plugins/metrics/configs/.rr-http-metrics.yaml @@ -13,7 +13,7 @@ http: num_workers: 1 metrics: - address: localhost:2112 + address: 127.0.0.1:2112 logs: mode: development diff --git a/tests/plugins/metrics/configs/.rr-test.yaml b/tests/plugins/metrics/configs/.rr-test.yaml index 4890076f..b5c4e64f 100644 --- a/tests/plugins/metrics/configs/.rr-test.yaml +++ b/tests/plugins/metrics/configs/.rr-test.yaml @@ -3,7 +3,7 @@ rpc: metrics: # prometheus client address (path /metrics added automatically) - address: localhost:2112 + address: 127.0.0.1:2112 collect: app_metric: type: histogram @@ -15,4 +15,4 @@ metrics: help: "Custom application counter." logs: mode: development - level: error
\ No newline at end of file + level: error diff --git a/tests/plugins/metrics/metrics_test.go b/tests/plugins/metrics/metrics_test.go index 3d900fcc..c4ea9f2c 100644 --- a/tests/plugins/metrics/metrics_test.go +++ b/tests/plugins/metrics/metrics_test.go @@ -27,7 +27,7 @@ import ( const dialAddr = "127.0.0.1:6001" const dialNetwork = "tcp" -const getAddr = "http://localhost:2112/metrics" +const getAddr = "http://127.0.0.1:2112/metrics" // get request and return body func get() (string, error) { @@ -130,7 +130,7 @@ func TestMetricsIssue571(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", []string{"metrics"}).MinTimes(1) - mockLogger.EXPECT().Debug("200 GET http://localhost:56444/", "remote", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("200 GET http://127.0.0.1:56444/", "remote", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("declaring new metric", "name", "test", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("metric successfully added", "name", "test", "type", gomock.Any(), "namespace", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("metric successfully added", "name", "test", "labels", []string{}, "value", gomock.Any()).MinTimes(1) @@ -209,7 +209,7 @@ func TestMetricsIssue571(t *testing.T) { // get request and return body func issue571Http() (string, error) { - r, err := http.Get("http://localhost:56444") + r, err := http.Get("http://127.0.0.1:56444") if err != nil { return "", err } @@ -229,7 +229,7 @@ func issue571Http() (string, error) { // get request and return body func issue571Metrics() (string, error) { - r, err := http.Get("http://localhost:23557") + r, err := http.Get("http://127.0.0.1:23557") if err != nil { return "", err } @@ -989,7 +989,7 @@ func TestHTTPMetrics(t *testing.T) { mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() - mockLogger.EXPECT().Debug("200 GET http://localhost:13223/", "remote", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("200 GET http://127.0.0.1:13223/", "remote", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) err = cont.RegisterAll( cfg, @@ -1056,7 +1056,7 @@ func TestHTTPMetrics(t *testing.T) { } func echoHTTP(t *testing.T) { - req, err := http.NewRequest("GET", "http://localhost:13223", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:13223", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) diff --git a/tests/plugins/redis/redis_plugin_test.go b/tests/plugins/redis/redis_plugin_test.go index 96a191a1..1b84e339 100644 --- a/tests/plugins/redis/redis_plugin_test.go +++ b/tests/plugins/redis/redis_plugin_test.go @@ -21,7 +21,7 @@ func redisConfig(port string) string { cfg := ` redis: addrs: - - 'localhost:%s' + - '127.0.0.1:%s' master_name: '' username: '' password: '' diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go index 61942516..5c26cbd0 100644 --- a/tests/plugins/resetter/test_plugin.go +++ b/tests/plugins/resetter/test_plugin.go @@ -9,7 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" ) -var testPoolConfig = poolImpl.Config{ +var testPoolConfig = &poolImpl.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, diff --git a/tests/plugins/server/configs/.rr-tcp.yaml b/tests/plugins/server/configs/.rr-tcp.yaml index 4582482f..6b9c9ddb 100644 --- a/tests/plugins/server/configs/.rr-tcp.yaml +++ b/tests/plugins/server/configs/.rr-tcp.yaml @@ -5,7 +5,7 @@ server: env: - RR_CONFIG: "/some/place/on/the/C134" - RR_CONFIG2: "C138" - relay: "tcp://localhost:9999" + relay: "tcp://127.0.0.1:9999" relay_timeout: "20s" logs: mode: development diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go index f1c13734..d136da1e 100644 --- a/tests/plugins/server/plugin_pipes.go +++ b/tests/plugins/server/plugin_pipes.go @@ -15,7 +15,7 @@ import ( const ConfigSection = "server" const Response = "test" -var testPoolConfig = pool.Config{ +var testPoolConfig = &pool.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, @@ -45,7 +45,7 @@ func (f *Foo) Serve() chan error { const op = errors.Op("serve") // test payload for echo - r := payload.Payload{ + r := &payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/tests/plugins/server/plugin_sockets.go b/tests/plugins/server/plugin_sockets.go index 0b2857e3..143a604c 100644 --- a/tests/plugins/server/plugin_sockets.go +++ b/tests/plugins/server/plugin_sockets.go @@ -30,7 +30,7 @@ func (f *Foo2) Serve() chan error { conf := &server.Config{} // test payload for echo - r := payload.Payload{ + r := &payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/tests/plugins/server/plugin_tcp.go b/tests/plugins/server/plugin_tcp.go index ef4cea39..57a2e6ea 100644 --- a/tests/plugins/server/plugin_tcp.go +++ b/tests/plugins/server/plugin_tcp.go @@ -30,7 +30,7 @@ func (f *Foo3) Serve() chan error { conf := &server.Config{} // test payload for echo - r := payload.Payload{ + r := &payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/tests/plugins/server/tcp.php b/tests/plugins/server/tcp.php index 873f25b2..acc1e1a5 100644 --- a/tests/plugins/server/tcp.php +++ b/tests/plugins/server/tcp.php @@ -8,7 +8,7 @@ use Spiral\RoadRunner; require dirname(__DIR__) . "/../vendor/autoload.php"; -$relay = new Goridge\SocketRelay("localhost", 9999); +$relay = new Goridge\SocketRelay("127.0.0.1", 9999); $rr = new RoadRunner\Worker($relay); while ($in = $rr->waitPayload()) { diff --git a/tests/plugins/service/placeholder.go b/tests/plugins/service/placeholder.go deleted file mode 100644 index 6d43c336..00000000 --- a/tests/plugins/service/placeholder.go +++ /dev/null @@ -1 +0,0 @@ -package service diff --git a/tests/plugins/status/plugin_test.go b/tests/plugins/status/plugin_test.go index 663f4ee3..227cfd46 100644 --- a/tests/plugins/status/plugin_test.go +++ b/tests/plugins/status/plugin_test.go @@ -345,7 +345,7 @@ func TestReadinessRPCWorkerNotReady(t *testing.T) { func doHTTPReq(t *testing.T) { go func() { - req, err := http.NewRequest("GET", "http://localhost:11933", nil) + req, err := http.NewRequest("GET", "http://127.0.0.1:11933", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) diff --git a/tests/plugins/websockets/configs/.rr-websockets-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-allow.yaml index e6c43857..900094a4 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-allow.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-allow.yaml @@ -21,7 +21,7 @@ http: redis: addrs: - - "localhost:6379" + - "127.0.0.1:6379" broadcast: test: diff --git a/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml b/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml index d537a80b..43f4b2ec 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml @@ -21,13 +21,13 @@ http: redis: addrs: - - "localhost:6379" + - "127.0.0.1:6379" broadcast: test: driver: redis addrs: - - "localhost:6379" + - "127.0.0.1:6379" websockets: broker: test diff --git a/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml b/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml index 4deea30a..e0bdf993 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml @@ -23,7 +23,7 @@ broadcast: test: driver: redis addrs: - - "localhost:6379" + - "127.0.0.1:6379" websockets: broker: test diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml index 3557f5f1..e3d5f0b8 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml @@ -21,7 +21,7 @@ http: redis: addrs: - - "localhost:6379" + - "127.0.0.1:6379" broadcast: test: diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 53b6a572..bfdc980b 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -443,7 +443,7 @@ func RPCWsMemoryStop(port string) func(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:" + port, Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NotNil(t, resp) @@ -613,7 +613,7 @@ func wsInit(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:11111", Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) @@ -648,7 +648,7 @@ func RPCWsPubAsync(port string) func(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:" + port, Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) @@ -725,7 +725,7 @@ func RPCWsPub(port string) func(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:" + port, Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) @@ -802,7 +802,7 @@ func RPCWsDeny(port string) func(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "127.0.0.1:" + port, Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) |