From 9ee78f937d5be67058882dd3590f89da35bca239 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 3 May 2021 22:52:30 +0300 Subject: - Initial broadcast commit Signed-off-by: Valery Piashchynski --- tests/plugins/http/handler_test.go | 64 ++++++++++++++++++------------------- tests/plugins/http/parse_test.go | 6 ++-- tests/plugins/http/response_test.go | 18 +++++------ tests/plugins/http/uploads_test.go | 10 +++--- 4 files changed, 49 insertions(+), 49 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 575fe656..4c7ab215 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -12,8 +12,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" "github.com/stretchr/testify/assert" "net/http" @@ -35,7 +35,7 @@ func TestHandler_Echo(t *testing.T) { t.Fatal(err) } - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -66,7 +66,7 @@ func TestHandler_Echo(t *testing.T) { } func Test_HandlerErrors(t *testing.T) { - _, err := handler.NewHandler(1024, config.Uploads{ + _, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, nil) @@ -89,7 +89,7 @@ func TestHandler_Headers(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -150,7 +150,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -210,7 +210,7 @@ func TestHandler_User_Agent(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -270,7 +270,7 @@ func TestHandler_Cookies(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -335,7 +335,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -399,7 +399,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -459,7 +459,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -519,7 +519,7 @@ func TestHandler_FormData_POST(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -592,7 +592,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -665,7 +665,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -737,7 +737,7 @@ func TestHandler_FormData_PUT(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -809,7 +809,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -881,7 +881,7 @@ func TestHandler_Multipart_POST(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -995,7 +995,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1109,7 +1109,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1225,7 +1225,7 @@ func TestHandler_Error(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1271,7 +1271,7 @@ func TestHandler_Error2(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1317,7 +1317,7 @@ func TestHandler_Error3(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1, config.Uploads{ + h, err := handler2.NewHandler(1, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1376,7 +1376,7 @@ func TestHandler_ResponseDuration(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1401,7 +1401,7 @@ func TestHandler_ResponseDuration(t *testing.T) { gotresp := make(chan interface{}) h.AddListener(func(event interface{}) { switch t := event.(type) { - case handler.ResponseEvent: + case handler2.ResponseEvent: if t.Elapsed() > 0 { close(gotresp) } @@ -1437,7 +1437,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1462,7 +1462,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { gotresp := make(chan interface{}) h.AddListener(func(event interface{}) { switch tp := event.(type) { - case handler.ResponseEvent: + case handler2.ResponseEvent: if tp.Elapsed() > time.Second { close(gotresp) } @@ -1497,7 +1497,7 @@ func TestHandler_ErrorDuration(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1522,7 +1522,7 @@ func TestHandler_ErrorDuration(t *testing.T) { goterr := make(chan interface{}) h.AddListener(func(event interface{}) { switch tp := event.(type) { - case handler.ErrorEvent: + case handler2.ErrorEvent: if tp.Elapsed() > 0 { close(goterr) } @@ -1571,7 +1571,7 @@ func TestHandler_IP(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, cidrs, p) @@ -1632,7 +1632,7 @@ func TestHandler_XRealIP(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, cidrs, p) @@ -1698,7 +1698,7 @@ func TestHandler_XForwardedFor(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, cidrs, p) @@ -1763,7 +1763,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, cidrs, p) @@ -1811,7 +1811,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { p.Destroy(context.Background()) }() - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go index 15c82839..832ecde4 100644 --- a/tests/plugins/http/parse_test.go +++ b/tests/plugins/http/parse_test.go @@ -3,7 +3,7 @@ package http import ( "testing" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" + handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler" ) var samples = []struct { @@ -21,7 +21,7 @@ var samples = []struct { func Test_FetchIndexes(t *testing.T) { for i := 0; i < len(samples); i++ { - r := handler.FetchIndexes(samples[i].in) + r := handler2.FetchIndexes(samples[i].in) if !same(r, samples[i].out) { t.Errorf("got %q, want %q", r, samples[i].out) } @@ -31,7 +31,7 @@ func Test_FetchIndexes(t *testing.T) { func BenchmarkConfig_FetchIndexes(b *testing.B) { for _, tt := range samples { for n := 0; n < b.N; n++ { - r := handler.FetchIndexes(tt.in) + r := handler2.FetchIndexes(tt.in) if !same(r, tt.out) { b.Fail() } diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go index 3564d9cd..648b6255 100644 --- a/tests/plugins/http/response_test.go +++ b/tests/plugins/http/response_test.go @@ -7,7 +7,7 @@ import ( "testing" "github.com/spiral/roadrunner/v2/pkg/payload" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" + handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/stretchr/testify/assert" ) @@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{Context: []byte(`invalid payload`)}) + r, err := handler2.NewResponse(payload.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -93,7 +93,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -114,7 +114,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -129,7 +129,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -139,7 +139,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { w := &testWriter{h: http.Header(make(map[string][]string))} assert.NoError(t, r.Write(w)) - assert.Nil(t, w.h[handler.TrailerHeaderKey]) + assert.Nil(t, w.h[handler2.TrailerHeaderKey]) assert.Nil(t, w.h["foo"]) //nolint:staticcheck assert.Nil(t, w.h["baz"]) //nolint:staticcheck @@ -148,7 +148,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := handler.NewResponse(payload.Payload{ + r, err := handler2.NewResponse(payload.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index 5c39589c..5381d30e 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -18,8 +18,8 @@ import ( j "github.com/json-iterator/go" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" "github.com/stretchr/testify/assert" ) @@ -40,7 +40,7 @@ func TestHandler_Upload_File(t *testing.T) { t.Fatal(err) } - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, pool) @@ -123,7 +123,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { t.Fatal(err) } - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, pool) @@ -206,7 +206,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { t.Fatal(err) } - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: "-------", Forbid: []string{}, }, nil, pool) @@ -289,7 +289,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { t.Fatal(err) } - h, err := handler.NewHandler(1024, config.Uploads{ + h, err := handler2.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{".go"}, }, nil, pool) -- cgit v1.2.3 From dc3c5455e5c9b32737a0620c8bdb8bda0226dba7 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 00:09:33 +0300 Subject: - Update all main abstractions - Desighn a new interfaces responsible for the whole PubSub - New plugin - websockets Signed-off-by: Valery Piashchynski --- tests/plugins/informer/.rr-informer.yaml | 6 +- .../websockets/configs/.rr-websockets-init.yaml | 50 ++++++++++ tests/plugins/websockets/websocket_plugin_test.go | 102 +++++++++++++++++++++ 3 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 tests/plugins/websockets/configs/.rr-websockets-init.yaml create mode 100644 tests/plugins/websockets/websocket_plugin_test.go (limited to 'tests/plugins') diff --git a/tests/plugins/informer/.rr-informer.yaml b/tests/plugins/informer/.rr-informer.yaml index e1edbb44..94c9a856 100644 --- a/tests/plugins/informer/.rr-informer.yaml +++ b/tests/plugins/informer/.rr-informer.yaml @@ -3,8 +3,8 @@ server: user: "" group: "" env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" + - RR_CONFIG: "/some/place/on/the/C134" + - RR_CONFIG: "C138" relay: "pipes" relay_timeout: "20s" @@ -12,4 +12,4 @@ rpc: listen: tcp://127.0.0.1:6001 logs: mode: development - level: error \ No newline at end of file + level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml new file mode 100644 index 00000000..9973b2dc --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -0,0 +1,50 @@ +rpc: + listen: tcp://127.0.0.1:6001 + + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:15395 + max_request_size: 1024 + middleware: ["websockets"] + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + # pubsubs should implement PubSub interface to be collected via endure.Collects + # also, they should implement RPC methods to publish data into them + # pubsubs might use general config section or its own + + pubsubs: [ "redis" ] + + # sample of the own config section for the redis pubsub driver + + redis: + addrs: + - localhost:1111 + # path used as websockets path + path: "/ws" +logs: + mode: development + level: debug + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go new file mode 100644 index 00000000..a9b90fd0 --- /dev/null +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -0,0 +1,102 @@ +package websockets + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/redis" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/websockets" + "github.com/stretchr/testify/assert" +) + +func TestBroadcastInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.InfoLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1000) + t.Run("test1", test1) + t.Run("test2", test2) + + stopCh <- struct{}{} + + wg.Wait() +} + +func test1(t *testing.T) { + +} + +func test2(t *testing.T) { + +} -- cgit v1.2.3 From cfaf3e5f8465d188546ef83600ad13c7dbde778c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 00:13:54 +0300 Subject: - Fix naming after bulk refactoring handler2 -> handler Signed-off-by: Valery Piashchynski --- tests/plugins/http/handler_test.go | 64 ++++++++++++++++++------------------- tests/plugins/http/parse_test.go | 6 ++-- tests/plugins/http/response_test.go | 18 +++++------ tests/plugins/http/uploads_test.go | 10 +++--- 4 files changed, 49 insertions(+), 49 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 4c7ab215..1fa29783 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -12,7 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler" + "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" @@ -35,7 +35,7 @@ func TestHandler_Echo(t *testing.T) { t.Fatal(err) } - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -66,7 +66,7 @@ func TestHandler_Echo(t *testing.T) { } func Test_HandlerErrors(t *testing.T) { - _, err := handler2.NewHandler(1024, config.Uploads{ + _, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, nil) @@ -89,7 +89,7 @@ func TestHandler_Headers(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -150,7 +150,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -210,7 +210,7 @@ func TestHandler_User_Agent(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -270,7 +270,7 @@ func TestHandler_Cookies(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -335,7 +335,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -399,7 +399,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -459,7 +459,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -519,7 +519,7 @@ func TestHandler_FormData_POST(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -592,7 +592,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -665,7 +665,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -737,7 +737,7 @@ func TestHandler_FormData_PUT(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -809,7 +809,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -881,7 +881,7 @@ func TestHandler_Multipart_POST(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -995,7 +995,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1109,7 +1109,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1225,7 +1225,7 @@ func TestHandler_Error(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1271,7 +1271,7 @@ func TestHandler_Error2(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1317,7 +1317,7 @@ func TestHandler_Error3(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1, config.Uploads{ + h, err := handler.NewHandler(1, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1376,7 +1376,7 @@ func TestHandler_ResponseDuration(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1401,7 +1401,7 @@ func TestHandler_ResponseDuration(t *testing.T) { gotresp := make(chan interface{}) h.AddListener(func(event interface{}) { switch t := event.(type) { - case handler2.ResponseEvent: + case handler.ResponseEvent: if t.Elapsed() > 0 { close(gotresp) } @@ -1437,7 +1437,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1462,7 +1462,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { gotresp := make(chan interface{}) h.AddListener(func(event interface{}) { switch tp := event.(type) { - case handler2.ResponseEvent: + case handler.ResponseEvent: if tp.Elapsed() > time.Second { close(gotresp) } @@ -1497,7 +1497,7 @@ func TestHandler_ErrorDuration(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) @@ -1522,7 +1522,7 @@ func TestHandler_ErrorDuration(t *testing.T) { goterr := make(chan interface{}) h.AddListener(func(event interface{}) { switch tp := event.(type) { - case handler2.ErrorEvent: + case handler.ErrorEvent: if tp.Elapsed() > 0 { close(goterr) } @@ -1571,7 +1571,7 @@ func TestHandler_IP(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, cidrs, p) @@ -1632,7 +1632,7 @@ func TestHandler_XRealIP(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, cidrs, p) @@ -1698,7 +1698,7 @@ func TestHandler_XForwardedFor(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, cidrs, p) @@ -1763,7 +1763,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, cidrs, p) @@ -1811,7 +1811,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { p.Destroy(context.Background()) }() - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, p) diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go index 832ecde4..32738ae0 100644 --- a/tests/plugins/http/parse_test.go +++ b/tests/plugins/http/parse_test.go @@ -3,7 +3,7 @@ package http import ( "testing" - handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler" + "github.com/spiral/roadrunner/v2/pkg/worker_handler" ) var samples = []struct { @@ -21,7 +21,7 @@ var samples = []struct { func Test_FetchIndexes(t *testing.T) { for i := 0; i < len(samples); i++ { - r := handler2.FetchIndexes(samples[i].in) + r := handler.FetchIndexes(samples[i].in) if !same(r, samples[i].out) { t.Errorf("got %q, want %q", r, samples[i].out) } @@ -31,7 +31,7 @@ func Test_FetchIndexes(t *testing.T) { func BenchmarkConfig_FetchIndexes(b *testing.B) { for _, tt := range samples { for n := 0; n < b.N; n++ { - r := handler2.FetchIndexes(tt.in) + r := handler.FetchIndexes(tt.in) if !same(r, tt.out) { b.Fail() } diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go index 648b6255..5b72df40 100644 --- a/tests/plugins/http/response_test.go +++ b/tests/plugins/http/response_test.go @@ -7,7 +7,7 @@ import ( "testing" "github.com/spiral/roadrunner/v2/pkg/payload" - handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler" + "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/stretchr/testify/assert" ) @@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := handler2.NewResponse(payload.Payload{Context: []byte(`invalid payload`)}) + r, err := handler.NewResponse(payload.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := handler2.NewResponse(payload.Payload{ + r, err := handler.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := handler2.NewResponse(payload.Payload{ + r, err := handler.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -93,7 +93,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := handler2.NewResponse(payload.Payload{ + r, err := handler.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -114,7 +114,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := handler2.NewResponse(payload.Payload{ + r, err := handler.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -129,7 +129,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := handler2.NewResponse(payload.Payload{ + r, err := handler.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -139,7 +139,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { w := &testWriter{h: http.Header(make(map[string][]string))} assert.NoError(t, r.Write(w)) - assert.Nil(t, w.h[handler2.TrailerHeaderKey]) + assert.Nil(t, w.h[handler.TrailerHeaderKey]) assert.Nil(t, w.h["foo"]) //nolint:staticcheck assert.Nil(t, w.h["baz"]) //nolint:staticcheck @@ -148,7 +148,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := handler2.NewResponse(payload.Payload{ + r, err := handler.NewResponse(payload.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index 5381d30e..82843d4e 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -18,7 +18,7 @@ import ( j "github.com/json-iterator/go" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler" + "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" ) @@ -40,7 +40,7 @@ func TestHandler_Upload_File(t *testing.T) { t.Fatal(err) } - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, pool) @@ -123,7 +123,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { t.Fatal(err) } - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{}, }, nil, pool) @@ -206,7 +206,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { t.Fatal(err) } - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: "-------", Forbid: []string{}, }, nil, pool) @@ -289,7 +289,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { t.Fatal(err) } - h, err := handler2.NewHandler(1024, config.Uploads{ + h, err := handler.NewHandler(1024, config.Uploads{ Dir: os.TempDir(), Forbid: []string{".go"}, }, nil, pool) -- cgit v1.2.3 From 0a9aea326045e56716f0736f7aa8520305362c51 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 13:26:12 +0300 Subject: - Move bst to the pkg folder - Add comments - Fix all golangci-lint warnings Signed-off-by: Valery Piashchynski --- tests/plugins/http/handler_test.go | 2 +- tests/plugins/http/parse_test.go | 2 +- tests/plugins/http/response_test.go | 2 +- tests/plugins/http/uploads_test.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 1fa29783..f6533dc4 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -12,7 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/pkg/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go index 32738ae0..d75620f3 100644 --- a/tests/plugins/http/parse_test.go +++ b/tests/plugins/http/parse_test.go @@ -3,7 +3,7 @@ package http import ( "testing" - "github.com/spiral/roadrunner/v2/pkg/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" ) var samples = []struct { diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go index 5b72df40..276c22ef 100644 --- a/tests/plugins/http/response_test.go +++ b/tests/plugins/http/response_test.go @@ -7,7 +7,7 @@ import ( "testing" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/stretchr/testify/assert" ) diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index 82843d4e..903a930a 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -18,7 +18,7 @@ import ( j "github.com/json-iterator/go" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/pkg/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" ) -- cgit v1.2.3 From e701d4d97fbbc7551e5d931731890933687ca8cd Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 19:59:04 +0300 Subject: - Update Makefile - Update arch diagram Signed-off-by: Valery Piashchynski --- .../websockets/configs/.rr-websockets-init.yaml | 16 ++---- tests/plugins/websockets/websocket_plugin_test.go | 63 ++++++++++++++++++++-- 2 files changed, 61 insertions(+), 18 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml index 9973b2dc..1b60d3e7 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -1,7 +1,6 @@ rpc: listen: tcp://127.0.0.1:6001 - server: command: "php ../../psr-worker-bench.php" user: "" @@ -10,11 +9,9 @@ server: relay_timeout: "20s" http: - address: 127.0.0.1:15395 + address: 127.0.0.1:11111 max_request_size: 1024 - middleware: ["websockets"] - uploads: - forbid: [ ".php", ".exe", ".bat" ] + middleware: [ "websockets" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] pool: num_workers: 2 @@ -30,16 +27,9 @@ websockets: # pubsubs should implement PubSub interface to be collected via endure.Collects # also, they should implement RPC methods to publish data into them # pubsubs might use general config section or its own - pubsubs: [ "redis" ] - - # sample of the own config section for the redis pubsub driver - - redis: - addrs: - - localhost:1111 - # path used as websockets path path: "/ws" + logs: mode: development level: debug diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index a9b90fd0..97577be8 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -1,6 +1,8 @@ package websockets import ( + "net/http" + "net/url" "os" "os/signal" "sync" @@ -8,14 +10,18 @@ import ( "testing" "time" + "github.com/fasthttp/websocket" + json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets" + "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/assert" ) @@ -36,6 +42,7 @@ func TestBroadcastInit(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -84,19 +91,65 @@ func TestBroadcastInit(t *testing.T) { } }() - time.Sleep(time.Second * 1000) - t.Run("test1", test1) - t.Run("test2", test2) + time.Sleep(time.Second * 1) + t.Run("TestWSInit", wsInit) stopCh <- struct{}{} wg.Wait() } -func test1(t *testing.T) { +type Msg struct { + // Topic message been pushed into. + T []string `json:"topic"` + // Command (join, leave, headers) + C string `json:"command"` + + // Broker (redis, memory) + B string `json:"broker"` + + // Payload to be broadcasted + P []byte `json:"payload"` } -func test2(t *testing.T) { +func wsInit(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + m := &Msg{ + T: []string{"foo", "foo2"}, + C: "join", + B: "memory", + P: []byte("hello websockets"), + } + d, err := json.Marshal(m) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) } -- cgit v1.2.3 From 0a64bb2a71ddb6b0ee5861e255a20df1327aa099 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 28 May 2021 13:19:02 +0300 Subject: - Tests for the ws-redis, ws-memory Signed-off-by: Valery Piashchynski --- .../websockets/configs/.rr-websockets-init.yaml | 3 +- .../configs/.rr-websockets-redis-memory.yaml | 39 +++ tests/plugins/websockets/websocket_plugin_test.go | 294 +++++++++++++++++++-- 3 files changed, 314 insertions(+), 22 deletions(-) create mode 100644 tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml (limited to 'tests/plugins') diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml index 1b60d3e7..dc073be3 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -25,14 +25,13 @@ redis: websockets: # pubsubs should implement PubSub interface to be collected via endure.Collects - # also, they should implement RPC methods to publish data into them # pubsubs might use general config section or its own pubsubs: [ "redis" ] path: "/ws" logs: mode: development - level: debug + level: error endure: grace_period: 120s diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml new file mode 100644 index 00000000..eedf5377 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml @@ -0,0 +1,39 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:13235 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + # pubsubs should implement PubSub interface to be collected via endure.Collects + # pubsubs might use general config section + pubsubs: [ "redis", "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 97577be8..294bbf0d 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -1,7 +1,9 @@ package websockets import ( + "net" "net/http" + "net/rpc" "net/url" "os" "os/signal" @@ -13,10 +15,11 @@ import ( "github.com/fasthttp/websocket" json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -25,8 +28,22 @@ import ( "github.com/stretchr/testify/assert" ) +type Msg struct { + // Topic message been pushed into. + Topics_ []string `json:"topic"` + + // Command (join, leave, headers) + Command_ string `json:"command"` + + // Broker (redis, memory) + Broker_ string `json:"broker"` + + // Payload to be broadcasted + Payload_ []byte `json:"payload"` +} + func TestBroadcastInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.InfoLevel)) + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ @@ -42,8 +59,8 @@ func TestBroadcastInit(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) + assert.NoError(t, err) err = cont.Init() @@ -99,27 +116,122 @@ func TestBroadcastInit(t *testing.T) { wg.Wait() } -type Msg struct { - // Topic message been pushed into. - T []string `json:"topic"` +func wsInit(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } - // Command (join, leave, headers) - C string `json:"command"` + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} - // Broker (redis, memory) - B string `json:"broker"` + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) - // Payload to be broadcasted - P []byte `json:"payload"` + defer func() { + _ = resp.Body.Close() + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) } -func wsInit(t *testing.T) { +func TestWSRedisAndMemory(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-redis-memory.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemory", RPCWsMemory) + t.Run("RPCWsRedis", RPCWsRedis) + + stopCh <- struct{}{} + + wg.Wait() +} + +func RPCWsMemory(t *testing.T) { da := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) @@ -128,14 +240,79 @@ func wsInit(t *testing.T) { _ = resp.Body.Close() }() - m := &Msg{ - T: []string{"foo", "foo2"}, - C: "join", - B: "memory", - P: []byte("hello websockets"), + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publish("", "memory", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publish("", "memory", "foo") + + go func() { + time.Sleep(time.Second * 5) + publish2("", "memory", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func RPCWsRedis(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, } - d, err := json.Marshal(m) + connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + d, err := json.Marshal(message("join", "redis", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -150,6 +327,83 @@ func wsInit(t *testing.T) { // subscription done assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + publish("", "redis", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "redis", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publish("", "redis", "foo") + + go func() { + time.Sleep(time.Second * 5) + publish2("", "redis", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) assert.NoError(t, err) } + +func publish(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + +func publish2(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + +func message(command string, broker string, payload []byte, topics ...string) *Msg { + return &Msg{ + Topics_: topics, + Command_: command, + Broker_: broker, + Payload_: payload, + } +} -- cgit v1.2.3 From 8f13eb958c7eec49acba6e343edb77c6ede89f09 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 28 May 2021 14:03:41 +0300 Subject: - PubAsync tests Signed-off-by: Valery Piashchynski --- tests/plugins/websockets/websocket_plugin_test.go | 103 ++++++++++++++++++++++ 1 file changed, 103 insertions(+) (limited to 'tests/plugins') diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 294bbf0d..087d1bc9 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -219,12 +219,85 @@ func TestWSRedisAndMemory(t *testing.T) { time.Sleep(time.Second * 1) t.Run("RPCWsMemory", RPCWsMemory) t.Run("RPCWsRedis", RPCWsRedis) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) stopCh <- struct{}{} wg.Wait() } +func RPCWsMemoryPubAsync(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publishAsync("", "memory", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publishAsync("", "memory", "foo") + + go func() { + time.Sleep(time.Second * 5) + publishAsync2("", "memory", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + func RPCWsMemory(t *testing.T) { da := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, @@ -384,6 +457,36 @@ func publish(command string, broker string, topics ...string) { } } +func publishAsync(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + +func publishAsync2(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + func publish2(command string, broker string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { -- cgit v1.2.3 From fcda08498e8f914bbd0798da898818cd5d0e4348 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 29 May 2021 00:24:30 +0300 Subject: - Add new internal plugin - channel. Which used to deliver messages from the ws plugin to the http directly Signed-off-by: Valery Piashchynski --- tests/plugins/config/config_test.go | 15 +++++++++++++++ tests/plugins/gzip/plugin_test.go | 3 +++ tests/plugins/headers/headers_plugin_test.go | 5 +++++ tests/plugins/http/http_plugin_test.go | 22 ++++++++++++++++++++++ tests/plugins/logger/logger_test.go | 3 +++ tests/plugins/metrics/metrics_test.go | 2 ++ tests/plugins/reload/reload_plugin_test.go | 6 ++++++ tests/plugins/status/plugin_test.go | 5 +++++ tests/plugins/websockets/websocket_plugin_test.go | 19 +++++++++++-------- 9 files changed, 72 insertions(+), 8 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/config/config_test.go b/tests/plugins/config/config_test.go index b6063cec..3cf026bd 100755 --- a/tests/plugins/config/config_test.go +++ b/tests/plugins/config/config_test.go @@ -7,6 +7,7 @@ import ( "time" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -33,6 +34,11 @@ func TestViperProvider_Init(t *testing.T) { t.Fatal(err) } + err = container.Register(&channel.Plugin{}) + if err != nil { + t.Fatal(err) + } + err = container.Init() if err != nil { t.Fatal(err) @@ -82,6 +88,7 @@ func TestConfigOverwriteFail(t *testing.T) { &rpc.Plugin{}, vp, &Foo2{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -103,6 +110,7 @@ func TestConfigOverwriteValid(t *testing.T) { &logger.ZapLogger{}, &rpc.Plugin{}, vp, + &channel.Plugin{}, &Foo2{}, ) assert.NoError(t, err) @@ -155,6 +163,7 @@ func TestConfigEnvVariables(t *testing.T) { &rpc.Plugin{}, vp, &Foo2{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -206,6 +215,7 @@ func TestConfigEnvVariablesFail(t *testing.T) { &rpc.Plugin{}, vp, &Foo2{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -237,6 +247,11 @@ func TestConfigProvider_GeneralSection(t *testing.T) { t.Fatal(err) } + err = container.Register(&channel.Plugin{}) + if err != nil { + t.Fatal(err) + } + err = container.Init() if err != nil { t.Fatal(err) diff --git a/tests/plugins/gzip/plugin_test.go b/tests/plugins/gzip/plugin_test.go index 844fd411..5294e672 100644 --- a/tests/plugins/gzip/plugin_test.go +++ b/tests/plugins/gzip/plugin_test.go @@ -11,6 +11,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/gzip" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -35,6 +36,7 @@ func TestGzipPlugin(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &gzip.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -128,6 +130,7 @@ func TestMiddlewareNotExist(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &gzip.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/headers/headers_plugin_test.go b/tests/plugins/headers/headers_plugin_test.go index 49d86b00..e4903335 100644 --- a/tests/plugins/headers/headers_plugin_test.go +++ b/tests/plugins/headers/headers_plugin_test.go @@ -11,6 +11,7 @@ import ( "time" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/headers" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -34,6 +35,7 @@ func TestHeadersInit(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &headers.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -100,6 +102,7 @@ func TestRequestHeaders(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &headers.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -185,6 +188,7 @@ func TestResponseHeaders(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &headers.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -271,6 +275,7 @@ func TestCORSHeaders(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &headers.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index 128eec26..aa57077d 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -24,6 +24,7 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/gzip" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -62,6 +63,7 @@ func TestHTTPInit(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -126,6 +128,7 @@ func TestHTTPNoConfigSection(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -193,6 +196,7 @@ func TestHTTPInformerReset(t *testing.T) { &httpPlugin.Plugin{}, &informer.Plugin{}, &resetter.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -315,6 +319,7 @@ func TestSSL(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -451,6 +456,7 @@ func TestSSLRedirect(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -540,6 +546,7 @@ func TestSSLPushPipes(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -630,6 +637,7 @@ func TestFastCGI_RequestUri(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -724,6 +732,7 @@ func TestH2CUpgrade(t *testing.T) { mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -815,6 +824,7 @@ func TestH2C(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -907,6 +917,7 @@ func TestHttpMiddleware(t *testing.T) { &httpPlugin.Plugin{}, &PluginMiddleware{}, &PluginMiddleware2{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1053,6 +1064,7 @@ logs: &httpPlugin.Plugin{}, &PluginMiddleware{}, &PluginMiddleware2{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1138,6 +1150,7 @@ func TestHttpEnvVariables(t *testing.T) { &httpPlugin.Plugin{}, &PluginMiddleware{}, &PluginMiddleware2{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1225,6 +1238,7 @@ func TestHttpBrokenPipes(t *testing.T) { &httpPlugin.Plugin{}, &PluginMiddleware{}, &PluginMiddleware2{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1286,6 +1300,7 @@ func TestHTTPSupervisedPool(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &informer.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1488,6 +1503,7 @@ func TestHTTPBigRequestSize(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1580,6 +1596,7 @@ func TestStaticEtagPlugin(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1678,6 +1695,7 @@ func TestStaticPluginSecurity(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1827,6 +1845,7 @@ func TestStaticPlugin(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -1941,6 +1960,7 @@ func TestStaticDisabled_Error(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) assert.Error(t, cont.Init()) @@ -1962,6 +1982,7 @@ func TestStaticFilesDisabled(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -2054,6 +2075,7 @@ func TestStaticFilesForbid(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/logger/logger_test.go b/tests/plugins/logger/logger_test.go index d2877781..f63a6a5d 100644 --- a/tests/plugins/logger/logger_test.go +++ b/tests/plugins/logger/logger_test.go @@ -9,6 +9,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -98,6 +99,7 @@ func TestLoggerRawErr(t *testing.T) { mockLogger, &server.Plugin{}, &http.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -224,6 +226,7 @@ func TestLoggerNoConfig2(t *testing.T) { &logger.ZapLogger{}, &http.Plugin{}, &server.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/metrics/metrics_test.go b/tests/plugins/metrics/metrics_test.go index 8be567ec..48c01f24 100644 --- a/tests/plugins/metrics/metrics_test.go +++ b/tests/plugins/metrics/metrics_test.go @@ -15,6 +15,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -144,6 +145,7 @@ func TestMetricsIssue571(t *testing.T) { &server.Plugin{}, mockLogger, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/reload/reload_plugin_test.go b/tests/plugins/reload/reload_plugin_test.go index 6db7b6d0..41c9c92f 100644 --- a/tests/plugins/reload/reload_plugin_test.go +++ b/tests/plugins/reload/reload_plugin_test.go @@ -16,6 +16,7 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/reload" @@ -65,6 +66,7 @@ func TestReloadInit(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -161,6 +163,7 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -270,6 +273,7 @@ func TestReloadFilterFileExt(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -400,6 +404,7 @@ func TestReloadCopy100(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -677,6 +682,7 @@ func TestReloadNoRecursion(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/status/plugin_test.go b/tests/plugins/status/plugin_test.go index 663f4ee3..06983199 100644 --- a/tests/plugins/status/plugin_test.go +++ b/tests/plugins/status/plugin_test.go @@ -14,6 +14,7 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -38,6 +39,7 @@ func TestStatusHttp(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &status.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -125,6 +127,7 @@ func TestStatusRPC(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &status.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -204,6 +207,7 @@ func TestReadyHttp(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &status.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -291,6 +295,7 @@ func TestReadinessRPCWorkerNotReady(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &status.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 087d1bc9..61ef186b 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -16,6 +16,7 @@ import ( json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -30,16 +31,16 @@ import ( type Msg struct { // Topic message been pushed into. - Topics_ []string `json:"topic"` + Topics []string `json:"topic"` // Command (join, leave, headers) - Command_ string `json:"command"` + Command string `json:"command"` // Broker (redis, memory) - Broker_ string `json:"broker"` + Broker string `json:"broker"` // Payload to be broadcasted - Payload_ []byte `json:"payload"` + Payload []byte `json:"payload"` } func TestBroadcastInit(t *testing.T) { @@ -59,6 +60,7 @@ func TestBroadcastInit(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -168,6 +170,7 @@ func TestWSRedisAndMemory(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &channel.Plugin{}, ) assert.NoError(t, err) @@ -504,9 +507,9 @@ func publish2(command string, broker string, topics ...string) { func message(command string, broker string, payload []byte, topics ...string) *Msg { return &Msg{ - Topics_: topics, - Command_: command, - Broker_: broker, - Payload_: payload, + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, } } -- cgit v1.2.3 From 49703d70a3ede70ce9a0cab824cbcb96dbf824c0 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 31 May 2021 16:05:00 +0300 Subject: - Rework access_validators - WS plugin uses it's own pool to handle requests on the /ws (or any user-defined) endpoint - Ability to write custom validators Signed-off-by: Valery Piashchynski --- tests/plugins/config/config_test.go | 15 --------------- tests/plugins/gzip/plugin_test.go | 3 --- tests/plugins/headers/headers_plugin_test.go | 5 ----- tests/plugins/http/http_plugin_test.go | 22 ---------------------- tests/plugins/logger/logger_test.go | 3 --- tests/plugins/metrics/metrics_test.go | 2 -- tests/plugins/reload/reload_plugin_test.go | 6 ------ tests/plugins/status/plugin_test.go | 5 ----- tests/plugins/websockets/websocket_plugin_test.go | 7 +++---- 9 files changed, 3 insertions(+), 65 deletions(-) (limited to 'tests/plugins') diff --git a/tests/plugins/config/config_test.go b/tests/plugins/config/config_test.go index 3cf026bd..b6063cec 100755 --- a/tests/plugins/config/config_test.go +++ b/tests/plugins/config/config_test.go @@ -7,7 +7,6 @@ import ( "time" endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -34,11 +33,6 @@ func TestViperProvider_Init(t *testing.T) { t.Fatal(err) } - err = container.Register(&channel.Plugin{}) - if err != nil { - t.Fatal(err) - } - err = container.Init() if err != nil { t.Fatal(err) @@ -88,7 +82,6 @@ func TestConfigOverwriteFail(t *testing.T) { &rpc.Plugin{}, vp, &Foo2{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -110,7 +103,6 @@ func TestConfigOverwriteValid(t *testing.T) { &logger.ZapLogger{}, &rpc.Plugin{}, vp, - &channel.Plugin{}, &Foo2{}, ) assert.NoError(t, err) @@ -163,7 +155,6 @@ func TestConfigEnvVariables(t *testing.T) { &rpc.Plugin{}, vp, &Foo2{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -215,7 +206,6 @@ func TestConfigEnvVariablesFail(t *testing.T) { &rpc.Plugin{}, vp, &Foo2{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -247,11 +237,6 @@ func TestConfigProvider_GeneralSection(t *testing.T) { t.Fatal(err) } - err = container.Register(&channel.Plugin{}) - if err != nil { - t.Fatal(err) - } - err = container.Init() if err != nil { t.Fatal(err) diff --git a/tests/plugins/gzip/plugin_test.go b/tests/plugins/gzip/plugin_test.go index 5294e672..844fd411 100644 --- a/tests/plugins/gzip/plugin_test.go +++ b/tests/plugins/gzip/plugin_test.go @@ -11,7 +11,6 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/gzip" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -36,7 +35,6 @@ func TestGzipPlugin(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &gzip.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -130,7 +128,6 @@ func TestMiddlewareNotExist(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &gzip.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/headers/headers_plugin_test.go b/tests/plugins/headers/headers_plugin_test.go index e4903335..49d86b00 100644 --- a/tests/plugins/headers/headers_plugin_test.go +++ b/tests/plugins/headers/headers_plugin_test.go @@ -11,7 +11,6 @@ import ( "time" endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/headers" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -35,7 +34,6 @@ func TestHeadersInit(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &headers.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -102,7 +100,6 @@ func TestRequestHeaders(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &headers.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -188,7 +185,6 @@ func TestResponseHeaders(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &headers.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -275,7 +271,6 @@ func TestCORSHeaders(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &headers.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index aa57077d..128eec26 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -24,7 +24,6 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/process" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/gzip" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -63,7 +62,6 @@ func TestHTTPInit(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -128,7 +126,6 @@ func TestHTTPNoConfigSection(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -196,7 +193,6 @@ func TestHTTPInformerReset(t *testing.T) { &httpPlugin.Plugin{}, &informer.Plugin{}, &resetter.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -319,7 +315,6 @@ func TestSSL(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -456,7 +451,6 @@ func TestSSLRedirect(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -546,7 +540,6 @@ func TestSSLPushPipes(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -637,7 +630,6 @@ func TestFastCGI_RequestUri(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -732,7 +724,6 @@ func TestH2CUpgrade(t *testing.T) { mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -824,7 +815,6 @@ func TestH2C(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -917,7 +907,6 @@ func TestHttpMiddleware(t *testing.T) { &httpPlugin.Plugin{}, &PluginMiddleware{}, &PluginMiddleware2{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1064,7 +1053,6 @@ logs: &httpPlugin.Plugin{}, &PluginMiddleware{}, &PluginMiddleware2{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1150,7 +1138,6 @@ func TestHttpEnvVariables(t *testing.T) { &httpPlugin.Plugin{}, &PluginMiddleware{}, &PluginMiddleware2{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1238,7 +1225,6 @@ func TestHttpBrokenPipes(t *testing.T) { &httpPlugin.Plugin{}, &PluginMiddleware{}, &PluginMiddleware2{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1300,7 +1286,6 @@ func TestHTTPSupervisedPool(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &informer.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1503,7 +1488,6 @@ func TestHTTPBigRequestSize(t *testing.T) { &logger.ZapLogger{}, &server.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1596,7 +1580,6 @@ func TestStaticEtagPlugin(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1695,7 +1678,6 @@ func TestStaticPluginSecurity(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1845,7 +1827,6 @@ func TestStaticPlugin(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -1960,7 +1941,6 @@ func TestStaticDisabled_Error(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) assert.Error(t, cont.Init()) @@ -1982,7 +1962,6 @@ func TestStaticFilesDisabled(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -2075,7 +2054,6 @@ func TestStaticFilesForbid(t *testing.T) { &httpPlugin.Plugin{}, &gzip.Plugin{}, &static.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/logger/logger_test.go b/tests/plugins/logger/logger_test.go index f63a6a5d..d2877781 100644 --- a/tests/plugins/logger/logger_test.go +++ b/tests/plugins/logger/logger_test.go @@ -9,7 +9,6 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -99,7 +98,6 @@ func TestLoggerRawErr(t *testing.T) { mockLogger, &server.Plugin{}, &http.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -226,7 +224,6 @@ func TestLoggerNoConfig2(t *testing.T) { &logger.ZapLogger{}, &http.Plugin{}, &server.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/metrics/metrics_test.go b/tests/plugins/metrics/metrics_test.go index 48c01f24..8be567ec 100644 --- a/tests/plugins/metrics/metrics_test.go +++ b/tests/plugins/metrics/metrics_test.go @@ -15,7 +15,6 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -145,7 +144,6 @@ func TestMetricsIssue571(t *testing.T) { &server.Plugin{}, mockLogger, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/reload/reload_plugin_test.go b/tests/plugins/reload/reload_plugin_test.go index 41c9c92f..6db7b6d0 100644 --- a/tests/plugins/reload/reload_plugin_test.go +++ b/tests/plugins/reload/reload_plugin_test.go @@ -16,7 +16,6 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/reload" @@ -66,7 +65,6 @@ func TestReloadInit(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -163,7 +161,6 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -273,7 +270,6 @@ func TestReloadFilterFileExt(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -404,7 +400,6 @@ func TestReloadCopy100(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -682,7 +677,6 @@ func TestReloadNoRecursion(t *testing.T) { &httpPlugin.Plugin{}, &reload.Plugin{}, &resetter.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/status/plugin_test.go b/tests/plugins/status/plugin_test.go index 06983199..663f4ee3 100644 --- a/tests/plugins/status/plugin_test.go +++ b/tests/plugins/status/plugin_test.go @@ -14,7 +14,6 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -39,7 +38,6 @@ func TestStatusHttp(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &status.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -127,7 +125,6 @@ func TestStatusRPC(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &status.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -207,7 +204,6 @@ func TestReadyHttp(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &status.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -295,7 +291,6 @@ func TestReadinessRPCWorkerNotReady(t *testing.T) { &server.Plugin{}, &httpPlugin.Plugin{}, &status.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 61ef186b..6b11f9e1 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -16,7 +16,6 @@ import ( json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -60,7 +59,6 @@ func TestBroadcastInit(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -170,7 +168,6 @@ func TestWSRedisAndMemory(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, - &channel.Plugin{}, ) assert.NoError(t, err) @@ -313,7 +310,9 @@ func RPCWsMemory(t *testing.T) { assert.NoError(t, err) defer func() { - _ = resp.Body.Close() + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } }() d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) -- cgit v1.2.3 From df4d316d519cea6dff654bd917521a616a37f769 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 31 May 2021 23:59:52 +0300 Subject: - Add more ws tests Signed-off-by: Valery Piashchynski --- .../configs/.rr-websockets-memory-allow.yaml | 37 ++ .../configs/.rr-websockets-memory-deny.yaml | 37 ++ .../configs/.rr-websockets-memory-stop.yaml | 37 ++ tests/plugins/websockets/websocket_plugin_test.go | 371 +++++++++++++++++++++ 4 files changed, 482 insertions(+) create mode 100644 tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml create mode 100644 tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml create mode 100644 tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml (limited to 'tests/plugins') diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml new file mode 100644 index 00000000..896cee05 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../worker-ok.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:11113 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + pubsubs: [ "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml new file mode 100644 index 00000000..e3bf5218 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../worker-deny.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:11112 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + pubsubs: [ "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml new file mode 100644 index 00000000..0614f4e7 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../worker-stop.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:11114 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + pubsubs: [ "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 6b11f9e1..772b53ac 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -444,6 +444,377 @@ func RPCWsRedis(t *testing.T) { assert.NoError(t, err) } +func TestWSMemoryDeny(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-memory-deny.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryDeny", RPCWsMemoryDeny) + + stopCh <- struct{}{} + + wg.Wait() +} + +func RPCWsMemoryDeny(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11112", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + assert.NotNil(t, c) + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + + defer func() { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func TestWSMemoryStop(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-memory-stop.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryStop", RPCWsMemoryStop) + + stopCh <- struct{}{} + + wg.Wait() +} + +func RPCWsMemoryStop(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11114", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NotNil(t, resp) + assert.Error(t, err) + assert.Nil(t, c) + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) //nolint:staticcheck + assert.Equal(t, resp.Header.Get("Stop"), "we-dont-like-you") //nolint:staticcheck + if resp != nil && resp.Body != nil { //nolint:staticcheck + _ = resp.Body.Close() + } +} + +func TestWSMemoryOk(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-memory-allow.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryAllow", RPCWsMemoryAllow) + + stopCh <- struct{}{} + + wg.Wait() +} + +func RPCWsMemoryAllow(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11113", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + assert.NotNil(t, c) + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + + defer func() { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publish("", "memory", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publish("", "memory", "foo") + + go func() { + time.Sleep(time.Second * 5) + publish2("", "memory", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + func publish(command string, broker string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { -- cgit v1.2.3