diff options
74 files changed, 3831 insertions, 259 deletions
diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index 82072675..cee7085c 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -13,6 +13,6 @@ jobs: - name: Run linter uses: golangci/golangci-lint-action@v2 # Action page: <https://github.com/golangci/golangci-lint-action> with: - version: v1.39 # without patch version + version: v1.40 # without patch version only-new-issues: false # show only new issues if it's a pull request args: --timeout=10m diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 89173b3f..a8f97d12 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -60,8 +60,10 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/transport/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/storage_ws.txt -covermode=atomic ./plugins/websockets/storage go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload @@ -77,6 +79,7 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets docker-compose -f ./tests/docker-compose.yaml down cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt @@ -27,3 +27,4 @@ tests/vendor/ .rr-sample.yaml cmd rr +**/old diff --git a/.golangci.yml b/.golangci.yml index bfc69f57..41e1d68f 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -14,8 +14,10 @@ output: linters-settings: govet: check-shadowing: true - golint: - min-confidence: 0.1 + revive: + confidence: 0.8 + errorCode: 0 + warningCode: 0 gocyclo: min-complexity: 15 godot: @@ -55,7 +57,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/ - gocritic # The most opinionated Go source code linter - gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification - goimports # Goimports does everything that gofmt does. Additionally it checks unused imports - - golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes + - revive # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes - goprintffuncname # Checks that printf-like functions are named with `f` at the end - gosec # Inspects source code for security problems - gosimple # Linter for Go source code that specializes in simplifying a code @@ -88,3 +90,4 @@ issues: - goconst - noctx - gosimple + - revive @@ -13,7 +13,9 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/storage-ws.out -covermode=atomic ./plugins/websockets/storage go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload @@ -29,6 +31,7 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/resetter.out -covermode=atomic ./tests/plugins/resetter go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/rpc.out -covermode=atomic ./tests/plugins/rpc go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/kv_plugin.out -covermode=atomic ./tests/plugins/kv + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/ws_plugin.out -covermode=atomic ./tests/plugins/websockets cat ./coverage/*.out > ./coverage/summary.out docker-compose -f tests/docker-compose.yaml down @@ -39,7 +42,9 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/pool go test -v -race -tags=debug ./pkg/worker go test -v -race -tags=debug ./pkg/worker_watcher + go test -v -race -tags=debug ./pkg/bst go test -v -race -tags=debug ./tests/plugins/http + go test -v -race -tags=debug ./plugins/websockets/storage go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./tests/plugins/informer go test -v -race -tags=debug ./tests/plugins/reload @@ -55,4 +60,5 @@ test: ## Run application tests go test -v -race -tags=debug ./tests/plugins/resetter go test -v -race -tags=debug ./tests/plugins/rpc go test -v -race -tags=debug ./tests/plugins/kv + go test -v -race -tags=debug ./tests/plugins/websockets docker-compose -f tests/docker-compose.yaml down diff --git a/codecov.yml b/codecov.yml index 43716f56..da01f6f7 100644 --- a/codecov.yml +++ b/codecov.yml @@ -14,10 +14,21 @@ coverage: # do not include tests folders ignore: - "tests" - - "plugins/kv/boltdb/plugin_unit_test.go" - - "plugins/kv/memcached/plugin_unit_test.go" - - "plugins/kv/memory/plugin_unit_test.go" + - "plugins/metrics/config_test.go" + - "plugins/websockets/storage/storage_test.go" + - "pkg/bst/bst_test.go" + - "pkg/doc" + - "pkg/pool/static_pool_test.go" + - "pkg/pool/supervisor_test.go" + - "pkg/pubsub" + - "pkg/transport/pipe/pipe_factory_spawn_test.go" + - "pkg/transport/pipe/pipe_factory_test.go" + - "pkg/transport/socket/socket_factory_spawn_test.go" + - "pkg/transport/socket/socket_factory_test.go" + - "pkg/transport/interface.go" + - "pkg/worker/state_test.go" + - "pkg/worker/sync_worker_test.go" + - "pkg/worker/worker_test.go" - "pkg/events/pool_events.go" - "pkg/events/worker_events.go" - - "interfaces" - - "systemd"
\ No newline at end of file + - "systemd" @@ -8,16 +8,20 @@ require ( github.com/alicebob/miniredis/v2 v2.14.5 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/dustin/go-humanize v1.0.0 + github.com/fasthttp/websocket v1.4.3 github.com/fatih/color v1.12.0 github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-redis/redis/v8 v8.9.0 github.com/gofiber/fiber/v2 v2.10.0 github.com/golang/mock v1.4.4 github.com/google/flatbuffers v1.12.1 + github.com/google/uuid v1.2.0 github.com/hashicorp/go-multierror v1.1.1 github.com/json-iterator/go v1.1.11 + github.com/klauspost/compress v1.12.2 // indirect github.com/olekukonko/tablewriter v0.0.5 github.com/prometheus/client_golang v1.10.0 + github.com/savsgio/gotils v0.0.0-20210316171653-c54912823645 // indirect github.com/shirou/gopsutil v3.21.3+incompatible github.com/spf13/viper v1.7.1 // SPIRAL ==== @@ -27,12 +31,13 @@ require ( // =========== github.com/stretchr/testify v1.7.0 github.com/tklauser/go-sysconf v0.3.4 // indirect - github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a + github.com/valyala/fasthttp v1.24.0 // indirect + github.com/valyala/tcplisten v1.0.0 github.com/yookoala/gofast v0.6.0 go.etcd.io/bbolt v1.3.5 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.17.0 golang.org/x/net v0.0.0-20210226101413-39120d07d75e golang.org/x/sync v0.0.0-20201207232520-09787c993a3a - golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 + golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 ) @@ -33,6 +33,7 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis/v2 v2.14.5 h1:iCFJiSur7871KaFJLAsBEpmc3DJHJ4YuB7W1hYLWs+U= github.com/alicebob/miniredis/v2 v2.14.5/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I= +github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc= github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -96,6 +97,8 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fasthttp/websocket v1.4.3 h1:qjhRJ/rTy4KB8oBxljEC00SDt6HUY9jLRfM601SUdS4= +github.com/fasthttp/websocket v1.4.3/go.mod h1:5r4oKssgS7W6Zn6mPWap3NWzNPJNzUUh3baWTOhcYQk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= @@ -146,6 +149,7 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= @@ -163,6 +167,8 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -225,9 +231,11 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4= github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8= +github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -358,6 +366,9 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVzGOXq8RbEvHnhzA/A2sLZzgn0m6URjnukY8= +github.com/savsgio/gotils v0.0.0-20210316171653-c54912823645 h1:ug9pfpEqVhvazyl1GezkQ9M/XdWsQn3VSx0s4qfH82I= +github.com/savsgio/gotils v0.0.0-20210316171653-c54912823645/go.mod h1:CN0/b5o0sSBi9K8fZTUamBG5NZXO0I64vTh9L3Mzhn0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shirou/gopsutil v3.21.3+incompatible h1:uenXGGa8ESCQq+dbgtl916dmg6PSAz2cXov0uORQ9v8= github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= @@ -415,10 +426,13 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.23.0 h1:0ufwSD9BhWa6f8HWdmdq4FHQ23peRo3Ng/Qs8m5NcFs= +github.com/valyala/fasthttp v1.14.0/go.mod h1:ol1PCaL0dX20wC0htZ7sYCsvCYmrouYra0zHzaclZhE= github.com/valyala/fasthttp v1.23.0/go.mod h1:0mw2RjXGOzxf4NL2jni3gUQ7LfjjUSiG5sskOUUSEpU= -github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= +github.com/valyala/fasthttp v1.24.0 h1:AAiG4oLDUArTb7rYf9oO2bkGooOqCaUF6a2u8asBP3I= +github.com/valyala/fasthttp v1.24.0/go.mod h1:0mw2RjXGOzxf4NL2jni3gUQ7LfjjUSiG5sskOUUSEpU= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= @@ -512,6 +526,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -570,8 +585,9 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY= golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 h1:hZR0X1kPW+nwyJ9xRxqZk1vx5RUObAPBdKVvXPDUH/E= +golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go new file mode 100644 index 00000000..664937ba --- /dev/null +++ b/pkg/bst/bst.go @@ -0,0 +1,136 @@ +package bst + +// BST ... +type BST struct { + // registered topic, not unique + topic string + // associated connections with the topic + uuids map[string]struct{} + + // left and right subtrees + left *BST + right *BST +} + +func NewBST() Storage { + return &BST{ + uuids: make(map[string]struct{}, 10), + } +} + +// Insert uuid to the topic +func (b *BST) Insert(uuid string, topic string) { + curr := b + + for { + if topic == curr.topic { + curr.uuids[uuid] = struct{}{} + return + } + // if topic less than curr topic + if topic < curr.topic { + if curr.left == nil { + curr.left = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + // move forward + curr = curr.left + } else { + if curr.right == nil { + curr.right = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + + curr = curr.right + } + } +} + +func (b *BST) Get(topic string) map[string]struct{} { + curr := b + for curr != nil { + switch { + case topic < curr.topic: + curr = curr.left + case topic > curr.topic: + curr = curr.right + case topic == curr.topic: + return curr.uuids + } + } + + return nil +} + +func (b *BST) Remove(uuid string, topic string) { + b.removeHelper(uuid, topic, nil) +} + +func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit + curr := b + for curr != nil { + if topic < curr.topic { //nolint:gocritic + parent = curr + curr = curr.left + } else if topic > curr.topic { + parent = curr + curr = curr.right + } else { + // if more than 1 topic - remove only topic, do not remove the whole vertex + if len(curr.uuids) > 1 { + if _, ok := curr.uuids[uuid]; ok { + delete(curr.uuids, uuid) + return + } + } + + if curr.left != nil && curr.right != nil { //nolint:gocritic + curr.topic, curr.uuids = curr.right.traverseForMinString() + curr.right.removeHelper(curr.topic, uuid, curr) + } else if parent == nil { + if curr.left != nil { //nolint:gocritic + curr.topic = curr.left.topic + curr.uuids = curr.left.uuids + + curr.right = curr.left.right + curr.left = curr.left.left + } else if curr.right != nil { + curr.topic = curr.right.topic + curr.uuids = curr.right.uuids + + curr.left = curr.right.left + curr.right = curr.right.right + } else { //nolint:staticcheck + // single node tree + } + } else if parent.left == curr { + if curr.left != nil { + parent.left = curr.left + } else { + parent.left = curr.right + } + } else if parent.right == curr { + if curr.left != nil { + parent.right = curr.left + } else { + parent.right = curr.right + } + } + break + } + } +} + +//go:inline +func (b *BST) traverseForMinString() (string, map[string]struct{}) { + if b.left == nil { + return b.topic, b.uuids + } + return b.left.traverseForMinString() +} diff --git a/pkg/bst/bst_test.go b/pkg/bst/bst_test.go new file mode 100644 index 00000000..e4d4e4c3 --- /dev/null +++ b/pkg/bst/bst_test.go @@ -0,0 +1,394 @@ +package bst + +import ( + "math/rand" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +const predifined = "chat-1-2" + +func TestNewBST(t *testing.T) { + // create a new bst + g := NewBST() + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments2") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments3") + } + + // should be 100 + exist := g.Get("comments") + assert.Len(t, exist, 100) + + // should be 100 + exist2 := g.Get("comments2") + assert.Len(t, exist2, 100) + + // should be 100 + exist3 := g.Get("comments3") + assert.Len(t, exist3, 100) +} + +func BenchmarkGraph(b *testing.B) { + g := NewBST() + + for i := 0; i < 1000; i++ { + uid := uuid.New().String() + g.Insert(uuid.NewString(), uid) + } + + g.Insert(uuid.NewString(), predifined) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + exist := g.Get(predifined) + _ = exist + } +} + +func BenchmarkBigSearch(b *testing.B) { + g1 := NewBST() + g2 := NewBST() + g3 := NewBST() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + b.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g2.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g3.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 333; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + for i := 0; i < 333; i++ { + g2.Insert(uuid.NewString(), predefinedSlice[333+i]) + } + + for i := 0; i < 333; i++ { + g3.Insert(uuid.NewString(), predefinedSlice[666+i]) + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g1.Get(predefinedSlice[i]) + _ = exist + } + } + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g2.Get(predefinedSlice[333+i]) + _ = exist + } + } + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g3.Get(predefinedSlice[666+i]) + _ = exist + } + } +} + +func BenchmarkBigSearchWithRemoves(b *testing.B) { + g1 := NewBST() + g2 := NewBST() + g3 := NewBST() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + b.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g2.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g3.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 333; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + for i := 0; i < 333; i++ { + g2.Insert(uuid.NewString(), predefinedSlice[333+i]) + } + + for i := 0; i < 333; i++ { + g3.Insert(uuid.NewString(), predefinedSlice[666+i]) + } + + go func() { + tt := time.NewTicker(time.Millisecond) + for { + select { + case <-tt.C: + num := rand.Intn(333) //nolint:gosec + values := g1.Get(predefinedSlice[num]) + for k := range values { + g1.Remove(k, predefinedSlice[num]) + } + } + } + }() + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g1.Get(predefinedSlice[i]) + _ = exist + } + } + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g2.Get(predefinedSlice[333+i]) + _ = exist + } + } + for i := 0; i < b.N; i++ { + for i := 0; i < 333; i++ { + exist := g3.Get(predefinedSlice[666+i]) + _ = exist + } + } +} + +func TestGraph(t *testing.T) { + g := NewBST() + + for i := 0; i < 1000; i++ { + uid := uuid.New().String() + g.Insert(uuid.NewString(), uid) + } + + g.Insert(uuid.NewString(), predifined) + + exist := g.Get(predifined) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) +} + +func TestTreeConcurrentContains(t *testing.T) { + g := NewBST() + + key1 := uuid.NewString() + key2 := uuid.NewString() + key3 := uuid.NewString() + key4 := uuid.NewString() + key5 := uuid.NewString() + + g.Insert(key1, predifined) + g.Insert(key2, predifined) + g.Insert(key3, predifined) + g.Insert(key4, predifined) + g.Insert(key5, predifined) + + for i := 0; i < 100; i++ { + go func() { + _ = g.Get(predifined) + }() + + go func() { + _ = g.Get(predifined) + }() + + go func() { + _ = g.Get(predifined) + }() + + go func() { + _ = g.Get(predifined) + }() + } + + time.Sleep(time.Second * 2) + + exist := g.Get(predifined) + assert.NotNil(t, exist) + assert.Len(t, exist, 5) +} + +func TestGraphRemove(t *testing.T) { + g := NewBST() + + key1 := uuid.NewString() + key2 := uuid.NewString() + key3 := uuid.NewString() + key4 := uuid.NewString() + key5 := uuid.NewString() + + g.Insert(key1, predifined) + g.Insert(key2, predifined) + g.Insert(key3, predifined) + g.Insert(key4, predifined) + g.Insert(key5, predifined) + + exist := g.Get(predifined) + assert.NotNil(t, exist) + assert.Len(t, exist, 5) + + g.Remove(key1, predifined) + + exist = g.Get(predifined) + assert.NotNil(t, exist) + assert.Len(t, exist, 4) +} + +func TestBigSearch(t *testing.T) { + g1 := NewBST() + g2 := NewBST() + g3 := NewBST() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + t.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g2.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 1000; i++ { + g3.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 333; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + for i := 0; i < 333; i++ { + g2.Insert(uuid.NewString(), predefinedSlice[333+i]) + } + + for i := 0; i < 333; i++ { + g3.Insert(uuid.NewString(), predefinedSlice[666+i]) + } + + for i := 0; i < 333; i++ { + exist := g1.Get(predefinedSlice[i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } + + for i := 0; i < 333; i++ { + exist := g2.Get(predefinedSlice[333+i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } + + for i := 0; i < 333; i++ { + exist := g3.Get(predefinedSlice[666+i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } +} + +func TestBigSearchWithRemoves(t *testing.T) { + g1 := NewBST() + g2 := NewBST() + g3 := NewBST() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + t.FailNow() + } + + for i := 0; i < 100000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 100000; i++ { + g2.Insert(uuid.NewString(), uuid.NewString()) + } + for i := 0; i < 100000; i++ { + g3.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 333; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + for i := 0; i < 333; i++ { + g2.Insert(uuid.NewString(), predefinedSlice[333+i]) + } + + for i := 0; i < 333; i++ { + g3.Insert(uuid.NewString(), predefinedSlice[666+i]) + } + + time.Sleep(time.Second * 1) + go func() { + tt := time.NewTicker(time.Second) + for { + select { + case <-tt.C: + num := rand.Intn(333) //nolint:gosec + values := g1.Get(predefinedSlice[num]) + for k := range values { + g1.Remove(k, predefinedSlice[num]) + } + } + } + }() + + for i := 0; i < 333; i++ { + exist := g1.Get(predefinedSlice[i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } + + for i := 0; i < 333; i++ { + exist := g2.Get(predefinedSlice[333+i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } + + for i := 0; i < 333; i++ { + exist := g3.Get(predefinedSlice[666+i]) + assert.NotNil(t, exist) + assert.Len(t, exist, 1) + } +} diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go new file mode 100644 index 00000000..abb7e6e9 --- /dev/null +++ b/pkg/bst/doc.go @@ -0,0 +1,7 @@ +package bst + +/* +Binary search tree for the pubsub + +The vertex may have one or multiply topics associated with the single websocket connection UUID +*/ diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go new file mode 100644 index 00000000..ecf40414 --- /dev/null +++ b/pkg/bst/interface.go @@ -0,0 +1,11 @@ +package bst + +// Storage is general in-memory BST storage implementation +type Storage interface { + // Insert inserts to a vertex with topic ident connection uuid + Insert(uuid string, topic string) + // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed + Remove(uuid, topic string) + // Get will return all connections associated with the topic + Get(topic string) map[string]struct{} +} diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index 4ef2f2e7..c22fbbd3 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -18,7 +18,7 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) - // Remove worker from the pool. + // RemoveWorker removes worker from the pool. RemoveWorker(worker worker.BaseProcess) error // Destroy all underlying stack (but let them to complete the task). diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index d57cc95c..b5d97b8b 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -47,7 +47,7 @@ type StaticPool struct { allocator worker.Allocator // err_encoder is the default Exec error encoder - err_encoder ErrorEncoder //nolint:golint,stylecheck + err_encoder ErrorEncoder //nolint:stylecheck } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -245,7 +245,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.BaseProcess) (payload.Payload, error) { - const op = errors.Op("error encoder") + const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 40903db3..ca61dbc4 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -15,7 +15,7 @@ import ( const MB = 1024 * 1024 // NSEC_IN_SEC nanoseconds in second -const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck +const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck type Supervised interface { Pool diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go new file mode 100644 index 00000000..caf8783f --- /dev/null +++ b/pkg/pubsub/interface.go @@ -0,0 +1,32 @@ +package pubsub + +// PubSub ... +type PubSub interface { + Publisher + Subscriber + Reader +} + +// Subscriber defines the ability to operate as message passing broker. +type Subscriber interface { + // Subscribe broker to one or multiple topics. + Subscribe(topics ...string) error + + // Unsubscribe from one or multiply topics + Unsubscribe(topics ...string) error +} + +// Publisher publish one or more messages +type Publisher interface { + // Publish one or multiple Channel. + Publish(messages []*Message) error + + // PublishAsync publish message and return immediately + // If error occurred it will be printed into the logger + PublishAsync(messages []*Message) +} + +// Reader interface should return next message +type Reader interface { + Next() (*Message, error) +} diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go new file mode 100644 index 00000000..c17d153b --- /dev/null +++ b/pkg/pubsub/message.go @@ -0,0 +1,24 @@ +package pubsub + +import ( + json "github.com/json-iterator/go" +) + +type Message struct { + // Command (join, leave, headers) + Command string `json:"command"` + + // Broker (redis, memory) + Broker string `json:"broker"` + + // Topic message been pushed into. + Topics []string `json:"topic"` + + // Payload to be broadcasted + Payload []byte `json:"payload"` +} + +// MarshalBinary needed to marshal message for the redis +func (m *Message) MarshalBinary() ([]byte, error) { + return json.Marshal(m) +} diff --git a/plugins/http/worker_handler/constants.go b/pkg/worker_handler/constants.go index 3355d9c2..3355d9c2 100644 --- a/plugins/http/worker_handler/constants.go +++ b/pkg/worker_handler/constants.go diff --git a/plugins/http/worker_handler/errors.go b/pkg/worker_handler/errors.go index 5fa8e64e..5fa8e64e 100644 --- a/plugins/http/worker_handler/errors.go +++ b/pkg/worker_handler/errors.go diff --git a/plugins/http/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go index 390cc7d1..390cc7d1 100644 --- a/plugins/http/worker_handler/errors_windows.go +++ b/pkg/worker_handler/errors_windows.go diff --git a/plugins/http/worker_handler/handler.go b/pkg/worker_handler/handler.go index be53fc12..e0d1aae0 100644 --- a/plugins/http/worker_handler/handler.go +++ b/pkg/worker_handler/handler.go @@ -89,7 +89,7 @@ func (h *Handler) AddListener(l events.Listener) { // mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - const op = errors.Op("http_plugin_serve_http") + const op = errors.Op("serve_http") start := time.Now() // validating request size @@ -202,16 +202,16 @@ func (h *Handler) resolveIP(r *Request) { // CF-Connecting-IP is an Enterprise feature and we check it last in order. // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string if r.Header.Get("X-Real-Ip") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip")) + r.RemoteAddr = FetchIP(r.Header.Get("X-Real-Ip")) return } if r.Header.Get("True-Client-IP") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP")) + r.RemoteAddr = FetchIP(r.Header.Get("True-Client-IP")) return } if r.Header.Get("CF-Connecting-IP") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP")) + r.RemoteAddr = FetchIP(r.Header.Get("CF-Connecting-IP")) } } diff --git a/plugins/http/worker_handler/parse.go b/pkg/worker_handler/parse.go index 2790da2a..2790da2a 100644 --- a/plugins/http/worker_handler/parse.go +++ b/pkg/worker_handler/parse.go diff --git a/plugins/http/worker_handler/request.go b/pkg/worker_handler/request.go index 178bc827..75ee8381 100644 --- a/plugins/http/worker_handler/request.go +++ b/pkg/worker_handler/request.go @@ -61,7 +61,7 @@ type Request struct { body interface{} } -func fetchIP(pair string) string { +func FetchIP(pair string) string { if !strings.ContainsRune(pair, ':') { return pair } @@ -73,10 +73,10 @@ func fetchIP(pair string) string { // NewRequest creates new PSR7 compatible request using net/http request. func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) { req := &Request{ - RemoteAddr: fetchIP(r.RemoteAddr), + RemoteAddr: FetchIP(r.RemoteAddr), Protocol: r.Proto, Method: r.Method, - URI: uri(r), + URI: URI(r), Header: r.Header, Cookies: make(map[string]string), RawQuery: r.URL.RawQuery, @@ -174,8 +174,8 @@ func (r *Request) contentType() int { return contentStream } -// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). -func uri(r *http.Request) string { +// URI fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func URI(r *http.Request) string { if r.URL.Host != "" { return r.URL.String() } diff --git a/plugins/http/worker_handler/response.go b/pkg/worker_handler/response.go index 1763d304..1763d304 100644 --- a/plugins/http/worker_handler/response.go +++ b/pkg/worker_handler/response.go diff --git a/plugins/http/worker_handler/uploads.go b/pkg/worker_handler/uploads.go index e695000e..e695000e 100644 --- a/plugins/http/worker_handler/uploads.go +++ b/pkg/worker_handler/uploads.go diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index 4625b7a7..29fa3640 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package worker_watcher //nolint:stylecheck import ( "context" @@ -23,9 +23,9 @@ type Watcher interface { // Destroy destroys the underlying container Destroy(ctx context.Context) - // WorkersList return all container w/o removing it from internal storage + // List return all container w/o removing it from internal storage List() []worker.BaseProcess - // RemoveWorker remove worker from the container + // Remove will remove worker from the container Remove(wb worker.BaseProcess) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 5aec4ee6..557563ac 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package worker_watcher //nolint:stylecheck import ( "context" @@ -11,7 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead container will not be replaced +// NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ container: container.NewVector(numWorkers), @@ -215,7 +215,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } } -// Warning, this is O(n) operation, and it will return copy of the actual workers +// List - this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) List() []worker.BaseProcess { ww.RLock() defer ww.RUnlock() diff --git a/plugins/config/interface.go b/plugins/config/interface.go index 59ad981f..b3854e09 100644 --- a/plugins/config/interface.go +++ b/plugins/config/interface.go @@ -11,7 +11,7 @@ type Configurer interface { // } UnmarshalKey(name string, out interface{}) error - // Unmarshal unmarshals the config into a Struct. Make sure that the tags + // Unmarshal unmarshal the config into a Struct. Make sure that the tags // on the fields of the structure are properly set. Unmarshal(out interface{}) error @@ -24,6 +24,6 @@ type Configurer interface { // Has checks if config section exists. Has(name string) bool - // Returns General section. Read-only + // GetCommonConfig returns General section. Read-only GetCommonConfig() *General } diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go index 8b63395f..a1c2afa6 100644 --- a/plugins/http/config/http.go +++ b/plugins/http/config/http.go @@ -7,7 +7,7 @@ import ( "time" "github.com/spiral/errors" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/pool" ) // HTTP configures RoadRunner HTTP server. @@ -34,7 +34,7 @@ type HTTP struct { Uploads *Uploads `mapstructure:"uploads"` // Pool configures worker pool. - Pool *poolImpl.Config `mapstructure:"pool"` + Pool *pool.Config `mapstructure:"pool"` // Env is environment variables passed to the http pool Env map[string]string @@ -70,7 +70,7 @@ func (c *HTTP) EnableFCGI() bool { func (c *HTTP) InitDefaults() error { if c.Pool == nil { // default pool - c.Pool = &poolImpl.Config{ + c.Pool = &pool.Config{ Debug: false, NumWorkers: uint64(runtime.NumCPU()), MaxJobs: 0, diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 2b68bbe5..770ca8ca 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -7,16 +7,15 @@ import ( "net/http" "sync" - "github.com/hashicorp/go-multierror" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/worker" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" httpConfig "github.com/spiral/roadrunner/v2/plugins/http/config" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/status" @@ -71,47 +70,47 @@ type Plugin struct { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error { +func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error { const op = errors.Op("http_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, err) } - err = s.cfg.InitDefaults() + err = p.cfg.InitDefaults() if err != nil { return errors.E(op, err) } // rr logger (via plugin) - s.log = rrLogger + p.log = rrLogger // use time and date in UTC format - s.stdLog = log.New(logger.NewStdAdapter(s.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC) + p.stdLog = log.New(logger.NewStdAdapter(p.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC) - s.mdwr = make(map[string]Middleware) + p.mdwr = make(map[string]Middleware) - if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() { + if !p.cfg.EnableHTTP() && !p.cfg.EnableTLS() && !p.cfg.EnableFCGI() { return errors.E(op, errors.Disabled) } // init if nil - if s.cfg.Env == nil { - s.cfg.Env = make(map[string]string) + if p.cfg.Env == nil { + p.cfg.Env = make(map[string]string) } - s.cfg.Env[RrMode] = "http" - s.server = server + p.cfg.Env[RrMode] = "http" + p.server = server return nil } -func (s *Plugin) logCallback(event interface{}) { +func (p *Plugin) logCallback(event interface{}) { if ev, ok := event.(handler.ResponseEvent); ok { - s.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI), + p.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI), "remote", ev.Request.RemoteAddr, "elapsed", ev.Elapsed().String(), ) @@ -119,60 +118,66 @@ func (s *Plugin) logCallback(event interface{}) { } // Serve serves the svc. -func (s *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { errCh := make(chan error, 2) // run whole process in the goroutine go func() { // protect http initialization - s.Lock() - s.serve(errCh) - s.Unlock() + p.Lock() + p.serve(errCh) + p.Unlock() }() return errCh } -func (s *Plugin) serve(errCh chan error) { +func (p *Plugin) serve(errCh chan error) { var err error const op = errors.Op("http_plugin_serve") - s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) + p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, p.cfg.Env, p.logCallback) if err != nil { errCh <- errors.E(op, err) return } - s.handler, err = handler.NewHandler( - s.cfg.MaxRequestSize, - *s.cfg.Uploads, - s.cfg.Cidrs, - s.pool, + p.handler, err = handler.NewHandler( + p.cfg.MaxRequestSize, + *p.cfg.Uploads, + p.cfg.Cidrs, + p.pool, ) if err != nil { errCh <- errors.E(op, err) return } - s.handler.AddListener(s.logCallback) + p.handler.AddListener(p.logCallback) - if s.cfg.EnableHTTP() { - if s.cfg.EnableH2C() { - s.http = &http.Server{Handler: h2c.NewHandler(s, &http2.Server{}), ErrorLog: s.stdLog} + if p.cfg.EnableHTTP() { + if p.cfg.EnableH2C() { + p.http = &http.Server{ + Handler: h2c.NewHandler(p, &http2.Server{}), + ErrorLog: p.stdLog, + } } else { - s.http = &http.Server{Handler: s, ErrorLog: s.stdLog} + p.http = &http.Server{ + Handler: p, + ErrorLog: p.stdLog, + } } } - if s.cfg.EnableTLS() { - s.https = s.initSSL() - if s.cfg.SSLConfig.RootCA != "" { - err = s.appendRootCa() + if p.cfg.EnableTLS() { + p.https = p.initSSL() + if p.cfg.SSLConfig.RootCA != "" { + err = p.appendRootCa() if err != nil { errCh <- errors.E(op, err) return @@ -180,102 +185,95 @@ func (s *Plugin) serve(errCh chan error) { } // if HTTP2Config not nil - if s.cfg.HTTP2Config != nil { - if err := s.initHTTP2(); err != nil { + if p.cfg.HTTP2Config != nil { + if err := p.initHTTP2(); err != nil { errCh <- errors.E(op, err) return } } } - if s.cfg.EnableFCGI() { - s.fcgi = &http.Server{Handler: s, ErrorLog: s.stdLog} + if p.cfg.EnableFCGI() { + p.fcgi = &http.Server{Handler: p, ErrorLog: p.stdLog} } // start http, https and fcgi servers if requested in the config go func() { - s.serveHTTP(errCh) + p.serveHTTP(errCh) }() go func() { - s.serveHTTPS(errCh) + p.serveHTTPS(errCh) }() go func() { - s.serveFCGI(errCh) + p.serveFCGI(errCh) }() } // Stop stops the http. -func (s *Plugin) Stop() error { - s.Lock() - defer s.Unlock() +func (p *Plugin) Stop() error { + p.Lock() + defer p.Unlock() - var err error - if s.fcgi != nil { - err = s.fcgi.Shutdown(context.Background()) + if p.fcgi != nil { + err := p.fcgi.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the fcgi server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) + p.log.Error("fcgi shutdown", "error", err) } } - if s.https != nil { - err = s.https.Shutdown(context.Background()) + if p.https != nil { + err := p.https.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the https server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) + p.log.Error("https shutdown", "error", err) } } - if s.http != nil { - err = s.http.Shutdown(context.Background()) + if p.http != nil { + err := p.http.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the http server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) + p.log.Error("http shutdown", "error", err) } } // check for safety - if s.pool != nil { - s.pool.Destroy(context.Background()) + if p.pool != nil { + p.pool.Destroy(context.Background()) } - return err + return nil } // ServeHTTP handles connection using set of middleware and pool PSR-7 server. -func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if headerContainsUpgrade(r) { http.Error(w, "server does not support upgrade header", http.StatusInternalServerError) return } - if s.https != nil && r.TLS == nil && s.cfg.SSLConfig.Redirect { - s.redirect(w, r) + if p.https != nil && r.TLS == nil && p.cfg.SSLConfig.Redirect { + p.redirect(w, r) return } - if s.https != nil && r.TLS != nil { + if p.https != nil && r.TLS != nil { w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") } r = attributes.Init(r) // protect the case, when user sendEvent Reset and we are replacing handler with pool - s.RLock() - s.handler.ServeHTTP(w, r) - s.RUnlock() + p.RLock() + p.handler.ServeHTTP(w, r) + p.RUnlock() } // Workers returns slice with the process states for the workers -func (s *Plugin) Workers() []process.State { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Workers() []process.State { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() ps := make([]process.State, 0, len(workers)) for i := 0; i < len(workers); i++ { @@ -290,74 +288,75 @@ func (s *Plugin) Workers() []process.State { } // internal -func (s *Plugin) workers() []worker.BaseProcess { - return s.pool.Workers() +func (p *Plugin) workers() []worker.BaseProcess { + return p.pool.Workers() } // Name returns endure.Named interface implementation -func (s *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // Reset destroys the old pool and replaces it with new one, waiting for old pool to die -func (s *Plugin) Reset() error { - s.Lock() - defer s.Unlock() +func (p *Plugin) Reset() error { + p.Lock() + defer p.Unlock() const op = errors.Op("http_plugin_reset") - s.log.Info("HTTP plugin got restart request. Restarting...") - s.pool.Destroy(context.Background()) - s.pool = nil + p.log.Info("HTTP plugin got restart request. Restarting...") + p.pool.Destroy(context.Background()) + p.pool = nil var err error - s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) + p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, p.cfg.Env, p.logCallback) if err != nil { return errors.E(op, err) } - s.log.Info("HTTP workers Pool successfully restarted") + p.log.Info("HTTP workers Pool successfully restarted") - s.handler, err = handler.NewHandler( - s.cfg.MaxRequestSize, - *s.cfg.Uploads, - s.cfg.Cidrs, - s.pool, + p.handler, err = handler.NewHandler( + p.cfg.MaxRequestSize, + *p.cfg.Uploads, + p.cfg.Cidrs, + p.pool, ) + if err != nil { return errors.E(op, err) } - s.log.Info("HTTP handler listeners successfully re-added") - s.handler.AddListener(s.logCallback) + p.log.Info("HTTP handler listeners successfully re-added") + p.handler.AddListener(p.logCallback) - s.log.Info("HTTP plugin successfully restarted") + p.log.Info("HTTP plugin successfully restarted") return nil } // Collects collecting http middlewares -func (s *Plugin) Collects() []interface{} { +func (p *Plugin) Collects() []interface{} { return []interface{}{ - s.AddMiddleware, + p.AddMiddleware, } } // AddMiddleware is base requirement for the middleware (name and Middleware) -func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) { - s.mdwr[name.Name()] = m +func (p *Plugin) AddMiddleware(name endure.Named, m Middleware) { + p.mdwr[name.Name()] = m } // Status return status of the particular plugin -func (s *Plugin) Status() status.Status { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Status() status.Status { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() for i := 0; i < len(workers); i++ { if workers[i].State().IsActive() { return status.Status{ @@ -372,14 +371,14 @@ func (s *Plugin) Status() status.Status { } // Ready return readiness status of the particular plugin -func (s *Plugin) Ready() status.Status { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Ready() status.Status { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() for i := 0; i < len(workers); i++ { // If state of the worker is ready (at least 1) - // we assume, that plugin's worker pool is ready + // we assume, that plugin'p worker pool is ready if workers[i].State().Value() == worker.StateReady { return status.Status{ Code: http.StatusOK, @@ -393,4 +392,4 @@ func (s *Plugin) Ready() status.Status { } // Available interface implementation -func (s *Plugin) Available() {} +func (p *Plugin) Available() {} diff --git a/plugins/http/serve.go b/plugins/http/serve.go index 78796322..bf1ccafe 100644 --- a/plugins/http/serve.go +++ b/plugins/http/serve.go @@ -17,46 +17,46 @@ import ( "golang.org/x/sys/cpu" ) -func (s *Plugin) serveHTTP(errCh chan error) { - if s.http == nil { +func (p *Plugin) serveHTTP(errCh chan error) { + if p.http == nil { return } - const op = errors.Op("http_plugin_serve_http") + const op = errors.Op("serveHTTP") - if len(s.mdwr) > 0 { - applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log) + if len(p.mdwr) > 0 { + applyMiddlewares(p.http, p.mdwr, p.cfg.Middleware, p.log) } - l, err := utils.CreateListener(s.cfg.Address) + l, err := utils.CreateListener(p.cfg.Address) if err != nil { errCh <- errors.E(op, err) return } - err = s.http.Serve(l) + err = p.http.Serve(l) if err != nil && err != http.ErrServerClosed { errCh <- errors.E(op, err) return } } -func (s *Plugin) serveHTTPS(errCh chan error) { - if s.https == nil { +func (p *Plugin) serveHTTPS(errCh chan error) { + if p.https == nil { return } - const op = errors.Op("http_plugin_serve_https") - if len(s.mdwr) > 0 { - applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log) + const op = errors.Op("serveHTTPS") + if len(p.mdwr) > 0 { + applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log) } - l, err := utils.CreateListener(s.cfg.SSLConfig.Address) + l, err := utils.CreateListener(p.cfg.SSLConfig.Address) if err != nil { errCh <- errors.E(op, err) return } - err = s.https.ServeTLS( + err = p.https.ServeTLS( l, - s.cfg.SSLConfig.Cert, - s.cfg.SSLConfig.Key, + p.cfg.SSLConfig.Cert, + p.cfg.SSLConfig.Key, ) if err != nil && err != http.ErrServerClosed { @@ -66,34 +66,34 @@ func (s *Plugin) serveHTTPS(errCh chan error) { } // serveFCGI starts FastCGI server. -func (s *Plugin) serveFCGI(errCh chan error) { - if s.fcgi == nil { +func (p *Plugin) serveFCGI(errCh chan error) { + if p.fcgi == nil { return } - const op = errors.Op("http_plugin_serve_fcgi") + const op = errors.Op("serveFCGI") - if len(s.mdwr) > 0 { - applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log) + if len(p.mdwr) > 0 { + applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log) } - l, err := utils.CreateListener(s.cfg.FCGIConfig.Address) + l, err := utils.CreateListener(p.cfg.FCGIConfig.Address) if err != nil { errCh <- errors.E(op, err) return } - err = fcgi.Serve(l, s.fcgi.Handler) + err = fcgi.Serve(l, p.fcgi.Handler) if err != nil && err != http.ErrServerClosed { errCh <- errors.E(op, err) return } } -func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) { +func (p *Plugin) redirect(w http.ResponseWriter, r *http.Request) { target := &url.URL{ Scheme: HTTPSScheme, // host or host:port - Host: s.tlsAddr(r.Host, false), + Host: p.tlsAddr(r.Host, false), Path: r.URL.Path, RawQuery: r.URL.RawQuery, } @@ -111,7 +111,7 @@ func headerContainsUpgrade(r *http.Request) bool { } // append RootCA to the https server TLS config -func (s *Plugin) appendRootCa() error { +func (p *Plugin) appendRootCa() error { const op = errors.Op("http_plugin_append_root_ca") rootCAs, err := x509.SystemCertPool() if err != nil { @@ -121,7 +121,7 @@ func (s *Plugin) appendRootCa() error { rootCAs = x509.NewCertPool() } - CA, err := os.ReadFile(s.cfg.SSLConfig.RootCA) + CA, err := os.ReadFile(p.cfg.SSLConfig.RootCA) if err != nil { return err } @@ -137,13 +137,13 @@ func (s *Plugin) appendRootCa() error { InsecureSkipVerify: false, RootCAs: rootCAs, } - s.http.TLSConfig = cfg + p.http.TLSConfig = cfg return nil } // Init https server -func (s *Plugin) initSSL() *http.Server { +func (p *Plugin) initSSL() *http.Server { var topCipherSuites []uint16 var defaultCipherSuitesTLS13 []uint16 @@ -193,9 +193,9 @@ func (s *Plugin) initSSL() *http.Server { DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) sslServer := &http.Server{ - Addr: s.tlsAddr(s.cfg.Address, true), - Handler: s, - ErrorLog: s.stdLog, + Addr: p.tlsAddr(p.cfg.Address, true), + Handler: p, + ErrorLog: p.stdLog, TLSConfig: &tls.Config{ CurvePreferences: []tls.CurveID{ tls.CurveP256, @@ -213,19 +213,19 @@ func (s *Plugin) initSSL() *http.Server { } // init http/2 server -func (s *Plugin) initHTTP2() error { - return http2.ConfigureServer(s.https, &http2.Server{ - MaxConcurrentStreams: s.cfg.HTTP2Config.MaxConcurrentStreams, +func (p *Plugin) initHTTP2() error { + return http2.ConfigureServer(p.https, &http2.Server{ + MaxConcurrentStreams: p.cfg.HTTP2Config.MaxConcurrentStreams, }) } // tlsAddr replaces listen or host port with port configured by SSLConfig config. -func (s *Plugin) tlsAddr(host string, forcePort bool) string { +func (p *Plugin) tlsAddr(host string, forcePort bool) string { // remove current forcePort first host = strings.Split(host, ":")[0] - if forcePort || s.cfg.SSLConfig.Port != 443 { - host = fmt.Sprintf("%s:%v", host, s.cfg.SSLConfig.Port) + if forcePort || p.cfg.SSLConfig.Port != 443 { + host = fmt.Sprintf("%s:%v", host, p.cfg.SSLConfig.Port) } return host diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 2e2df527..0f647cb1 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" ) @@ -393,7 +394,7 @@ func (d *Driver) startGCLoop() { //nolint:gocognit if b == nil { return errors.E(op, errors.NoSuchBucket) } - err := b.Delete([]byte(k)) + err := b.Delete(utils.AsBytes(k)) if err != nil { return errors.E(op, err) } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 17b06fa0..02281ed5 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -9,6 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -148,7 +149,7 @@ func (d *Driver) Set(items ...kv.Item) error { memcachedItem := &memcache.Item{ Key: items[i].Key, // unsafe convert - Value: []byte(items[i].Value), + Value: utils.AsBytes(items[i].Value), Flags: 0, } diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index 1e0d03d4..c2494ee7 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -9,6 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -70,7 +71,7 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return []byte(data.(kv.Item).Value), nil + return utils.AsBytes(data.(kv.Item).Value), nil } return nil, nil } diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go index d2183411..3694c5a7 100644 --- a/plugins/kv/drivers/redis/plugin.go +++ b/plugins/kv/drivers/redis/plugin.go @@ -28,7 +28,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { // Serve is noop here func (s *Plugin) Serve() chan error { - return make(chan error, 1) + return make(chan error) } func (s *Plugin) Stop() error { diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go index fe2fa10b..9a609735 100644 --- a/plugins/kv/storage.go +++ b/plugins/kv/storage.go @@ -90,9 +90,9 @@ func (p *Plugin) Serve() chan error { return errCh } - // config key for the particular sub-driver + // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, k) - // at this point we know, that driver field present in the cofiguration + // at this point we know, that driver field present in the configuration switch v.(map[string]interface{})[driver] { case memcached: if _, ok := p.drivers[memcached]; !ok { diff --git a/plugins/logger/interface.go b/plugins/logger/interface.go index 5bb2143b..827f9821 100644 --- a/plugins/logger/interface.go +++ b/plugins/logger/interface.go @@ -8,7 +8,7 @@ type Logger interface { Error(msg string, keyvals ...interface{}) } -// With creates a child logger and adds structured context to it +// WithLogger creates a child logger and adds structured context to it type WithLogger interface { With(keyvals ...interface{}) Logger } diff --git a/plugins/logger/std_log_adapter.go b/plugins/logger/std_log_adapter.go index 484cc23e..479aa565 100644 --- a/plugins/logger/std_log_adapter.go +++ b/plugins/logger/std_log_adapter.go @@ -1,7 +1,7 @@ package logger import ( - "unsafe" + "github.com/spiral/roadrunner/v2/utils" ) // StdLogAdapter can be passed to the http.Server or any place which required standard logger to redirect output @@ -12,7 +12,7 @@ type StdLogAdapter struct { // Write io.Writer interface implementation func (s *StdLogAdapter) Write(p []byte) (n int, err error) { - s.log.Error("server internal error", "message", toString(p)) + s.log.Error("server internal error", "message", utils.AsString(p)) return len(p), nil } @@ -24,8 +24,3 @@ func NewStdAdapter(log Logger) *StdLogAdapter { return logAdapter } - -// unsafe, but lightning fast []byte to string conversion -func toString(data []byte) string { - return *(*string)(unsafe.Pointer(&data)) -} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go new file mode 100644 index 00000000..49c187bc --- /dev/null +++ b/plugins/memory/plugin.go @@ -0,0 +1,81 @@ +package memory + +import ( + "sync" + + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "memory" +) + +type Plugin struct { + log logger.Logger + + // channel with the messages from the RPC + pushCh chan *pubsub.Message + // user-subscribed topics + topics sync.Map +} + +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + p.pushCh = make(chan *pubsub.Message, 100) + return nil +} + +// Available interface implementation for the plugin +func (p *Plugin) Available() {} + +// Name is endure.Named interface implementation +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Publish(messages []*pubsub.Message) error { + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + return nil +} + +func (p *Plugin) PublishAsync(messages []*pubsub.Message) { + go func() { + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + }() +} + +func (p *Plugin) Subscribe(topics ...string) error { + for i := 0; i < len(topics); i++ { + p.topics.Store(topics[i], struct{}{}) + } + return nil +} + +func (p *Plugin) Unsubscribe(topics ...string) error { + for i := 0; i < len(topics); i++ { + p.topics.Delete(topics[i]) + } + return nil +} + +func (p *Plugin) Next() (*pubsub.Message, error) { + msg := <-p.pushCh + + if msg == nil { + return nil, nil + } + + // push only messages, which are subscribed + // TODO better??? + for i := 0; i < len(msg.Topics); i++ { + if _, ok := p.topics.Load(msg.Topics[i]); ok { + return msg, nil + } + } + return nil, nil +} diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go new file mode 100644 index 00000000..93b13124 --- /dev/null +++ b/plugins/redis/fanin.go @@ -0,0 +1,100 @@ +package redis + +import ( + "context" + "sync" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/utils" +) + +type FanIn struct { + sync.Mutex + + client redis.UniversalClient + pubsub *redis.PubSub + + log logger.Logger + + // out channel with all subs + out chan *pubsub.Message + + exit chan struct{} +} + +func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { + out := make(chan *pubsub.Message, 100) + fi := &FanIn{ + out: out, + client: redisClient, + pubsub: redisClient.Subscribe(context.Background()), + exit: make(chan struct{}), + log: log, + } + + // start reading messages + go fi.read() + + return fi +} + +func (fi *FanIn) AddChannel(topics ...string) error { + const op = errors.Op("fanin_addchannel") + err := fi.pubsub.Subscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +// read reads messages from the pubsub subscription +func (fi *FanIn) read() { + for { + select { + // here we receive message from us (which we sent before in Publish) + // it should be compatible with the websockets.Msg interface + // payload should be in the redis.message.payload field + + case msg, ok := <-fi.pubsub.Channel(): + // channel closed + if !ok { + return + } + m := &pubsub.Message{} + err := json.Unmarshal(utils.AsBytes(msg.Payload), m) + if err != nil { + fi.log.Error("failed to unmarshal payload", "error", err.Error()) + continue + } + + fi.out <- m + case <-fi.exit: + return + } + } +} + +func (fi *FanIn) RemoveChannel(topics ...string) error { + const op = errors.Op("fanin_remove") + err := fi.pubsub.Unsubscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (fi *FanIn) Stop() error { + fi.exit <- struct{}{} + close(fi.out) + close(fi.exit) + return nil +} + +func (fi *FanIn) Consume() <-chan *pubsub.Message { + return fi.out +} diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 2eab7043..c1480de8 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -1,8 +1,12 @@ package redis import ( + "context" + "sync" + "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -10,73 +14,133 @@ import ( const PluginName = "redis" type Plugin struct { + sync.Mutex // config for RR integration cfg *Config // logger log logger.Logger // redis universal client universalClient redis.UniversalClient + + fanin *FanIn } -func (s *Plugin) GetClient() redis.UniversalClient { - return s.universalClient +func (p *Plugin) GetClient() redis.UniversalClient { + return p.universalClient } -func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("redis_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } - s.cfg.InitDefaults() - s.log = log - - s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: s.cfg.Addrs, - DB: s.cfg.DB, - Username: s.cfg.Username, - Password: s.cfg.Password, - SentinelPassword: s.cfg.SentinelPassword, - MaxRetries: s.cfg.MaxRetries, - MinRetryBackoff: s.cfg.MaxRetryBackoff, - MaxRetryBackoff: s.cfg.MaxRetryBackoff, - DialTimeout: s.cfg.DialTimeout, - ReadTimeout: s.cfg.ReadTimeout, - WriteTimeout: s.cfg.WriteTimeout, - PoolSize: s.cfg.PoolSize, - MinIdleConns: s.cfg.MinIdleConns, - MaxConnAge: s.cfg.MaxConnAge, - PoolTimeout: s.cfg.PoolTimeout, - IdleTimeout: s.cfg.IdleTimeout, - IdleCheckFrequency: s.cfg.IdleCheckFreq, - ReadOnly: s.cfg.ReadOnly, - RouteByLatency: s.cfg.RouteByLatency, - RouteRandomly: s.cfg.RouteRandomly, - MasterName: s.cfg.MasterName, + p.cfg.InitDefaults() + p.log = log + + p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: p.cfg.Addrs, + DB: p.cfg.DB, + Username: p.cfg.Username, + Password: p.cfg.Password, + SentinelPassword: p.cfg.SentinelPassword, + MaxRetries: p.cfg.MaxRetries, + MinRetryBackoff: p.cfg.MaxRetryBackoff, + MaxRetryBackoff: p.cfg.MaxRetryBackoff, + DialTimeout: p.cfg.DialTimeout, + ReadTimeout: p.cfg.ReadTimeout, + WriteTimeout: p.cfg.WriteTimeout, + PoolSize: p.cfg.PoolSize, + MinIdleConns: p.cfg.MinIdleConns, + MaxConnAge: p.cfg.MaxConnAge, + PoolTimeout: p.cfg.PoolTimeout, + IdleTimeout: p.cfg.IdleTimeout, + IdleCheckFrequency: p.cfg.IdleCheckFreq, + ReadOnly: p.cfg.ReadOnly, + RouteByLatency: p.cfg.RouteByLatency, + RouteRandomly: p.cfg.RouteRandomly, + MasterName: p.cfg.MasterName, }) + // init fanin + p.fanin = NewFanIn(p.universalClient, log) + return nil } -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) +func (p *Plugin) Serve() chan error { + errCh := make(chan error) return errCh } -func (s Plugin) Stop() error { - return s.universalClient.Close() +func (p *Plugin) Stop() error { + const op = errors.Op("redis_plugin_stop") + err := p.fanin.Stop() + if err != nil { + return errors.E(op, err) + } + + err = p.universalClient.Close() + if err != nil { + return errors.E(op, err) + } + + return nil } -func (s *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // Available interface implementation -func (s *Plugin) Available() { +func (p *Plugin) Available() {} + +func (p *Plugin) Publish(msg []*pubsub.Message) error { + p.Lock() + defer p.Unlock() + + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) + if f.Err() != nil { + return f.Err() + } + } + } + return nil +} + +func (p *Plugin) PublishAsync(msg []*pubsub.Message) { + go func() { + p.Lock() + defer p.Unlock() + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) + continue + } + } + } + }() +} + +func (p *Plugin) Subscribe(topics ...string) error { + return p.fanin.AddChannel(topics...) +} + +func (p *Plugin) Unsubscribe(topics ...string) error { + return p.fanin.RemoveChannel(topics...) +} + +// Next return next message +func (p *Plugin) Next() (*pubsub.Message, error) { + return <-p.fanin.Consume(), nil } diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go index 421668b3..c40c2fdf 100644 --- a/plugins/reload/watcher.go +++ b/plugins/reload/watcher.go @@ -305,7 +305,7 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) { w.mu.Lock() defer w.mu.Unlock() - // Store create and remove events for use to check for rename events. + // InsertMany create and remove events for use to check for rename events. creates := make(map[string]os.FileInfo) removes := make(map[string]os.FileInfo) diff --git a/plugins/server/interface.go b/plugins/server/interface.go index 22f02685..0424d52d 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -14,7 +14,10 @@ type Env map[string]string // Server creates workers for the application. type Server interface { + // CmdFactory return a new command based on the .rr.yaml server.command section CmdFactory(env Env) (func() *exec.Cmd, error) + // NewWorker return a new worker with provided and attached by the user listeners and environment variables NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) + // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index ef77f7ab..aab9dcde 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -25,9 +25,9 @@ import ( const PluginName = "server" // RR_RELAY env variable key (internal) -const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck +const RR_RELAY = "RR_RELAY" //nolint:stylecheck // RR_RPC env variable key (internal) if the RPC presents -const RR_RPC = "RR_RPC" //nolint:golint,stylecheck +const RR_RPC = "RR_RPC" //nolint:stylecheck // Plugin manages worker type Plugin struct { diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go new file mode 100644 index 00000000..18c63be3 --- /dev/null +++ b/plugins/websockets/commands/enums.go @@ -0,0 +1,9 @@ +package commands + +type Command string + +const ( + Leave string = "leave" + Join string = "join" + Headers string = "headers" +) diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go new file mode 100644 index 00000000..be4aaa82 --- /dev/null +++ b/plugins/websockets/config.go @@ -0,0 +1,58 @@ +package websockets + +import ( + "time" + + "github.com/spiral/roadrunner/v2/pkg/pool" +) + +/* +websockets: + # pubsubs should implement PubSub interface to be collected via endure.Collects + + pubsubs:["redis", "amqp", "memory"] + # path used as websockets path + path: "/ws" +*/ + +// Config represents configuration for the ws plugin +type Config struct { + // http path for the websocket + Path string `mapstructure:"path"` + // ["redis", "amqp", "memory"] + PubSubs []string `mapstructure:"pubsubs"` + Middleware []string `mapstructure:"middleware"` + + Pool *pool.Config `mapstructure:"pool"` +} + +// InitDefault initialize default values for the ws config +func (c *Config) InitDefault() { + if c.Path == "" { + c.Path = "/ws" + } + if len(c.PubSubs) == 0 { + // memory used by default + c.PubSubs = append(c.PubSubs, "memory") + } + + if c.Pool == nil { + c.Pool = &pool.Config{} + if c.Pool.NumWorkers == 0 { + // 2 workers by default + c.Pool.NumWorkers = 2 + } + + if c.Pool.AllocateTimeout == 0 { + c.Pool.AllocateTimeout = time.Minute + } + + if c.Pool.DestroyTimeout == 0 { + c.Pool.DestroyTimeout = time.Minute + } + if c.Pool.Supervisor == nil { + return + } + c.Pool.Supervisor.InitDefaults() + } +} diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go new file mode 100644 index 00000000..2b847173 --- /dev/null +++ b/plugins/websockets/connection/connection.go @@ -0,0 +1,67 @@ +package connection + +import ( + "sync" + + "github.com/fasthttp/websocket" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// Connection represents wrapped and safe to use from the different threads websocket connection +type Connection struct { + sync.RWMutex + log logger.Logger + conn *websocket.Conn +} + +func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection { + return &Connection{ + conn: wsConn, + log: log, + } +} + +func (c *Connection) Write(mt int, data []byte) error { + c.Lock() + defer c.Unlock() + + const op = errors.Op("websocket_write") + // handle a case when a goroutine tried to write into the closed connection + defer func() { + if r := recover(); r != nil { + c.log.Warn("panic handled, tried to write into the closed connection") + } + }() + + err := c.conn.WriteMessage(mt, data) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *Connection) Read() (int, []byte, error) { + const op = errors.Op("websocket_read") + + mt, data, err := c.conn.ReadMessage() + if err != nil { + return -1, nil, errors.E(op, err) + } + + return mt, data, nil +} + +func (c *Connection) Close() error { + c.Lock() + defer c.Unlock() + const op = errors.Op("websocket_close") + + err := c.conn.Close() + if err != nil { + return errors.E(op, err) + } + + return nil +} diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio new file mode 100644 index 00000000..230870f2 --- /dev/null +++ b/plugins/websockets/doc/broadcast.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-05-27T20:56:56.848Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="Pt0MY_-SPz7R7foQA1VL" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">7V1Zc9rIFv411HUeULV28WgwTjxjx06wJ8l9mRKoAU2ExEjCS3797W4t9IbAoMZLrqcqg1oLrT77OV8fOuZg8fgx9ZfzqySAUccAwWPHPOsYhmECC/0PjzwVI7ppesXILA2Dcmw9MAp/wXIQlKOrMIAZc2GeJFEeLtnBSRLHcJIzY36aJg/sZdMkYr916c+gMDCa+JE4+i0M8nkxagIA1ic+wXA2z/kzC7+6uhzI5n6QPFBD5rBjDtIkyYtPi8cBjPDyVQtT3He+4Ww9sxTG+S43PFlXl7c/+n48uTWjn1/SEMSn3V45t/ypemMYoAUoD5M0nyezJPaj4Xq0vx69TJIlukxHg//APH8q6eev8gQNzfNFVJ6Fj2H+nfr8A30GmmGXh2eYYUB18FQdxHn69J0+oG/Dx+v7yFF1o7gy5WJlySqdwIblqFjMT2cwb7qupCleLOobyoX/CJMFRBNCF6Qw8vPwnuUmv2TKWX1dfetNEqI5G6CUIN2y7eKeSoB0y2AfUsy1vG9NfvSBmsh6iDCFnEGuvFUc/53d9od/zbI/xr9On7y7rm7swSFpsooDGJTkOIBfKB75QZ3axi8Ui/xgOETOLxlaw/wUaws0ECcxrMbOQ7xYFE9x9FbLZNaBLMWwwrPp7hQPvvejVflVlpQTLv0xUvoM9fwonMXo8wQtDUzRwD1M8xAp1dPyxCIMgoJRYBb+8sfkeXiVl5iNyavY/Y59Vq87fgB8ZN641PjlzWs1S1OkgaPFdS0f3wUacD2PkbqKZPtKc/WYHntHMp1mUInUms7LSq1LSy3YUWpZLe9u0/IvL7XSpTeclxTbyjpQYvsV+gEa+Tz8fnvyATsmMMuw14O8oTRZ4Fea44PlapytxnhOMb48g3GQ4W/LictVX/aQpD9his8skR/2PjSCuYFk5eOBpjuGyyoE4zCFUOka1+H0jHssDWHrAqPIdcZ7oybS77Zlm+y6m62Qk+UR3VNAyybVRJHy27A/uh78ObwdofFPd2iVwclo+PWvIfpwfvd58EEg9cM8zOFo6RMd+ICiOZbkU6RTB0mUpORqM/ChN50QhZsmPyF1xpl4cDzFdyRxTo0D8tfknAs8sJGEumFxXrFhlwL5sA7SdKvkmDkVnzlAlb/0igzuzm7ys7zkFm2psaMt1e2XtKWiBzzy7+FaNYVJXJvLB58Yyili98pSjtPEDyZ+lq9tbvY+NGyzdALNNAxWPtsyl6brauyTLbsaOILJNAWG8H4HgiKLaRkeR9EDI6Ly0Y6hWazVdLRqmdUT1Ph/buOlNPvB+bKDNLttCZIsT4S+OUk2mpMbiHt6wFChm3X2qci3UJHsaOI52v3Fbu8VocMDWjGBtM9zdqc2/k/q7JI/0dkt/tpydh1W9TpeyQy0rwskvq6lzNcFwooeUW/uWUOgEkq1ut1Vb04iP8vCCac69XZVpyWqzs3R3qGVBvRm/hN1QambNos3qLiuruSVeuR89ztc0HyHaTffgT4U8243V/rCXsDuTsAutnxvR+GtcrKYhelxXGT1elqP+uMChg0FtOdKSM/hmbfXzO7CPJ99g+5tESiDF0HuDjUCVUkxZZDt9+Fdmc3eFQqUXNdhS0ftZBaRyXW0qgBXh2A69xx1LpboL38dnl2M8PcY4OJz92p4df31x6F+1hQ6E2lSMXB7Y5I8VJhU7AFOVoyeJKdoKPKzTOvvs+ki/1N/7D1G/t/9weWPv8r0F7Ps5h626hDrxPha7o6+ls7Yne2VvP0tTeWJ7mZqBPPWVPKxqpCw4gf7MEk+QhlA1Ltfb9CjQP9udKBoQj2woSsTzZ7jmr4kBGo13+/slO+XyWYb+f7B2Y9o8V83eFwtT/8MgssvZ3nW3afAvo8g7pmsf369ZichlC7FrngWY0cZbD0HJJ21JwjLoMQQMvXuj9d4ynGwLMX4hCmaI09iGs7EatrRvBuaXk18urMX1OTcOI7L6sS28vo9U+PTy0dLAleahGIE9+Xouae3upn0DSbO0S2dXXWrFXLqntbj4BK6oTmGAoo26SOKoAI9/WxZoIWn4SOO4mmiiqQRyBwuCGyY2LxSdevGevwsXMzQzKNwjP71f61SiF9yBmOY+mj2532MS4aplt3PWjKQllAYs3UxS+jVF9Em0lXmvu6TWGktlbIr6kx7Vlpwfytp7mgl24cWH0REQ3Qq5WQ9urlr5LnD7R3QDNNWUvTk4gqkGlWUSpqYkCLmzV3/8mL0SSDpM+MDc+IBIPMv+6Bny0L3KflrKz5wdI2DHCDbtqsCNJUpQNES7QMTeuUqcfcK8z5q0xbVZhNn7x7gG5xog31k+9llFBfwbFr6RRtzuNvuYHO4ivINTaShlcmnmwMViQ9swwikgeqZfq5ekfCJBssVtYg00dCGEpGuck9Y5S46vFvOUj8QwGgkcH1ADOhEaF79cYo+zfAnfE+JXitx3HCcJZOfmDOYR0hvxNJMrlssCrhbhBURFhY/COMZfu3iRWJ0STGwRryxMHPZ5ASeQeTK5a5EBKf5YY4EVpZZ6bO3xDWmy1Xoq2OKaUxHwjS2MssjMA2BRAyiEJZVrmeIaBtLZLlsWcSWgBg894i2WRSrii+XzOI4/67wBs3+AhmgELHaKToLlo/oX7IyoBjv5tgw43MWdQ6zcbfkXHyu9IOZ0wGcJKlPZI9cg8vKaRQiK1p/9VpMSsGpBq7qnRxIW64m+QpDXYpr0JqM+fvQ2JIfm6f8yF6vX4glPunhk/zMb5NlOCH3nBWTJTpi86xUzKGP7UkqTAKcpDAIkVIaEKW1SNKnD6KGuvGfooRsoynuTv0H9O/4KYdSdXbcNxvUavnkH6TuileJINH2AyJPyFKk2Qf5vDhdwGtWwti8+k3QVdOIOHxT4uQJmRB8fO4vwgjL+ycY3UP81Pb1isd7R6YHtCp5SKtfmc32epprK9IvkmKM6PrHAec3SzP/Haaatq2YJodJNVfu9nfPJVmNJnW71T2nSGY3GMxDM5Suw/t5vCku3lxAhUiSnTr/KMA9SvEO7X1KworAePvt53/WRs+2oXhyFK3I1ZsrWkfIzMm3egoKZhgHxAVAUkRunyRRVBa05tjEgQUFiS03rsDHZZLhwyJ+ID76PM+xnVpGK2yVXizZ1ypyp9GMdIGGTQQjx23VtnTd1mzu0dXxEbY4CEzy6fYWBeng5vLu48VngbisYtgSGAgk56vONvQCSxbMe8bYJMDpjZQWxHazDwAqW1uHXzvG7Mo2A7rCql8slhFcQJIcAjer8YhstQ7xuk3x8u4eAbchX9IY+CAiiGBGS0IEV0IEQxURJNlXtPBRmM07TIV/XAYGb0zNbRMKE2gAWJZnuLbnGIDbm6yO9y051vNV+yPPw9ao21Il52MJnFpN54n9UtoOJ/qeswWVLNywBfeMXOxe0w1qUMyWuP13UOdJL87eyVZea8MO67VjZFoWB/ppCSUCND5teiynyBbt8waswjukJ+C38raEUDcNjQuqu0drZqKLsdAn5GF0l8i7StIFCtqwoe+PbgmFk7RImj6E+bz2AnKcoszqsAgFmckk9HMYdOj6iEzuW3WZp94EyjHwY8/GSYp2XGabQzZ45ku7zJYcdPfKG2i4r9pxkNTOX2PLKhGJ+5GUJf1JvsJ4OfCQdZtKlOj//gLLWTzOSLK66MiRIkEkK1el7pG+Kr3+xTtR9u42ZW/x+7LbQTDZumZ5ki11VVMOXVPR+0huy8VCXmUNfgPymlzyua3N/JZmHY2Ckl1Ot3OMLOhm/pSkI/1s3l34S95k/4RP2GB3RQtdGnHyxOoSRokotuIO8Hu6dLuMceY6O6BYDrPurskGWK4n4hHqDsttQ4HlNBY97nUupm5I+MZzYLrt8sVI2TYl2cKrS4KJxrXYEzZeETnJc38yJ04u3eORtHeUZ//fHFEsfl+nqYtEMWT9M9QRRbRZBVEmBYyKqKsKn5VptaS8eVK4HCks3daALoqId0xqSBDulY8ZhPc8gCNb+nE1RjBVlZwgN/snMTZsVplFv617r57UeJRpCKOABmfQX0ENs5OReLwYyJJW9T4/Z76Okm5K5jekvo8OvzuIrTyhLUJPDGZNU8JSbYDL5NXGV1STV9Vk/8jBbHXh9p6RL9o23ZRkopB0Rljw/rjGBVes5y+Hp6QVawmolWDIRnf90eDrRR9ddX73mTp6D4GNubUajzxotkPvgZvvyycbKK5htzzyKkBhoCqGORv6aL03egJNdw122VtquNw1WC7Rdc1V0ROlUSPRAJzvw8Hd7fVXUZ7LJuwUKv6kEn7ck73A06sMSfWxr0NDFpIC4AxPzxWHpHzHZk9io6Vun7KEsyHW99RSwEdBhiFvI3he9spQmhSwPb7gptc/VvRiaX9DTAy8byo4EipIdioemQpiluB9U8GVUMF4cSrI9qS8Zyp4Eiq8eCHSfOFC5H5NoRXt2G0ldKtI+spDN0vcHF/UIetWP0yJge0DtMpyclhgDMZPHQZXIOWot+fmb61H2U6V8D7YsXc1dsOgfrxATXQMKyjpb0BBg2vx1E7BuKv3+Mi7q6R7pfy15T/T8KqxJVwPxVeWjdtdpR/6Cy77YVI9j+PiyofYhDE1gJBFfu4dHnuDIlSqmF8sjBSHkWHNE1VBp0zTMk3uw2AHhNvb1HXeNl3nuNxvj7Rju1yDZSTjaIZLjOJ0eSHiHRLTtjwFtOxSP2RR2a2jAWMqJfvbxIM9STwo20Z03HhQTPPyv4EJTj5eiz1A31x53uWQEj3ZPrqjolcsMSNyCWcwDkg3hE19F0ihXNJlgW/HQYrZG7ts0FV4XeuQXXs8YIm1sqT7EfCXy+fW8hXN2sCzptA+r2JSJp7UunFJhVngIUg1fAI36gynIT7PYh7C9X7lRTbrUEDfV/GeFssyslejO4+EcZdrPnLE+dt4rt9qwFGHArVgHOYa1LKJ8av9rJ/R15W/MYzCpYD1MSnPs/QDyHCFg1H3eg7LcstVNicVuDUevFP9rvF/KKX+ZQVXNXRmXDrME/QYGKicrYtnWzVqqh1z0qqGmi6BHq3bUqubjkemQ0cWqyVpPcYJ7EPJyKs4/Jcs293dxRmxCRn6hvXV+Gcy8XUFZIn8aGZNEOpxVMRykq/RwB9UvmoPv+rwEU6QaU1L3u8YjZ3MJK3TVMxMB3hq59SPi27EkjBzIhDPlIC+AkKx8xQpmfuaGufMC/VHt91qG5TCd8HGlFXlWIVcX18Wd90U0jfjOtMVSVd59Frv3eJfR+l7FOYV5qs0Ltmd4+FCjyicAbGlTXtlhDXJBWA9kTGFcyR28FvDXhyeYeVzeeO+NY98le23s3sy15rPe+/gW2MxTzAV1sEoCgXnV0mAg/7h/wA=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/doc/doc.go b/plugins/websockets/doc/doc.go new file mode 100644 index 00000000..fc214be8 --- /dev/null +++ b/plugins/websockets/doc/doc.go @@ -0,0 +1,27 @@ +package doc + +/* +RPC message structure: + +type Msg struct { + // Topic message been pushed into. + Topics_ []string `json:"topic"` + + // Command (join, leave, headers) + Command_ string `json:"command"` + + // Broker (redis, memory) + Broker_ string `json:"broker"` + + // Payload to be broadcasted + Payload_ []byte `json:"payload"` +} + +1. Topics - string array (slice) with topics to join or leave +2. Command - string, command to apply on the provided topics +3. Broker - string, pub-sub broker to use, for the one-node systems might be used `memory` broker or `redis`. For the multi-node - +`redis` broker should be used. +4. Payload - raw byte array to send to the subscribers (binary messages). + + +*/ diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go new file mode 100644 index 00000000..24ea19ce --- /dev/null +++ b/plugins/websockets/executor/executor.go @@ -0,0 +1,226 @@ +package executor + +import ( + "fmt" + "net/http" + "sync" + + "github.com/fasthttp/websocket" + json "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/commands" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" + "github.com/spiral/roadrunner/v2/plugins/websockets/validator" +) + +type Response struct { + Topic string `json:"topic"` + Payload []string `json:"payload"` +} + +type Executor struct { + sync.Mutex + conn *connection.Connection + storage *storage.Storage + log logger.Logger + + // associated connection ID + connID string + + // map with the pubsub drivers + pubsub map[string]pubsub.PubSub + actualTopics map[string]struct{} + + req *http.Request + accessValidator validator.AccessValidatorFn +} + +// NewExecutor creates protected connection and starts command loop +func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, + connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor { + return &Executor{ + conn: conn, + connID: connID, + storage: bst, + log: log, + pubsub: pubsubs, + accessValidator: av, + actualTopics: make(map[string]struct{}, 10), + req: r, + } +} + +func (e *Executor) StartCommandLoop() error { //nolint:gocognit + const op = errors.Op("executor_command_loop") + for { + mt, data, err := e.conn.Read() + if err != nil { + if mt == -1 { + e.log.Info("socket was closed", "reason", err, "message type", mt) + return nil + } + + return errors.E(op, err) + } + + msg := &pubsub.Message{} + + err = json.Unmarshal(data, msg) + if err != nil { + e.log.Error("error unmarshal message", "error", err) + continue + } + + // nil message, continue + if msg == nil { + e.log.Warn("get nil message, skipping") + continue + } + + switch msg.Command { + // handle leave + case commands.Join: + e.log.Debug("get join command", "msg", msg) + + val, err := e.accessValidator(e.req, msg.Topics...) + if err != nil { + if val != nil { + e.log.Debug("validation error", "status", val.Status, "headers", val.Header, "body", val.Body) + } + + resp := &Response{ + Topic: "#join", + Payload: msg.Topics, + } + + packet, errJ := json.Marshal(resp) + if errJ != nil { + e.log.Error("error marshal the body", "error", errJ) + return errors.E(op, fmt.Errorf("%v,%v", err, errJ)) + } + + errW := e.conn.Write(websocket.BinaryMessage, packet) + if errW != nil { + e.log.Error("error writing payload to the connection", "payload", packet, "error", errW) + return errors.E(op, fmt.Errorf("%v,%v", err, errW)) + } + + continue + } + + resp := &Response{ + Topic: "@join", + Payload: msg.Topics, + } + + packet, err := json.Marshal(resp) + if err != nil { + e.log.Error("error marshal the body", "error", err) + return errors.E(op, err) + } + + err = e.conn.Write(websocket.BinaryMessage, packet) + if err != nil { + e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + return errors.E(op, err) + } + + // subscribe to the topic + if br, ok := e.pubsub[msg.Broker]; ok { + err = e.Set(br, msg.Topics) + if err != nil { + return errors.E(op, err) + } + } + + // handle leave + case commands.Leave: + e.log.Debug("get leave command", "msg", msg) + + // prepare response + resp := &Response{ + Topic: "@leave", + Payload: msg.Topics, + } + + packet, err := json.Marshal(resp) + if err != nil { + e.log.Error("error marshal the body", "error", err) + return errors.E(op, err) + } + + err = e.conn.Write(websocket.BinaryMessage, packet) + if err != nil { + e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + return errors.E(op, err) + } + + if br, ok := e.pubsub[msg.Broker]; ok { + err = e.Leave(br, msg.Topics) + if err != nil { + return errors.E(op, err) + } + } + + case commands.Headers: + + default: + e.log.Warn("unknown command", "command", msg.Command) + } + } +} + +func (e *Executor) Set(br pubsub.PubSub, topics []string) error { + // associate connection with topics + err := br.Subscribe(topics...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + // in case of error, unsubscribe connection from the dead topics + _ = br.Unsubscribe(topics...) + return err + } + + e.storage.InsertMany(e.connID, topics) + + // save topics for the connection + for i := 0; i < len(topics); i++ { + e.actualTopics[topics[i]] = struct{}{} + } + + return nil +} + +func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { + // remove associated connections from the storage + e.storage.RemoveMany(e.connID, topics) + err := br.Unsubscribe(topics...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + return err + } + + // remove topics for the connection + for i := 0; i < len(topics); i++ { + delete(e.actualTopics, topics[i]) + } + + return nil +} + +func (e *Executor) CleanUp() { + for topic := range e.actualTopics { + // remove from the bst + e.storage.Remove(e.connID, topic) + + for _, ps := range e.pubsub { + _ = ps.Unsubscribe(topic) + } + } + + for k := range e.actualTopics { + delete(e.actualTopics, k) + } +} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go new file mode 100644 index 00000000..9b21ff8f --- /dev/null +++ b/plugins/websockets/plugin.go @@ -0,0 +1,386 @@ +package websockets + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/fasthttp/websocket" + "github.com/google/uuid" + json "github.com/json-iterator/go" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/payload" + phpPool "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/http/attributes" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/executor" + "github.com/spiral/roadrunner/v2/plugins/websockets/pool" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" + "github.com/spiral/roadrunner/v2/plugins/websockets/validator" +) + +const ( + PluginName string = "websockets" +) + +type Plugin struct { + sync.RWMutex + // Collection with all available pubsubs + pubsubs map[string]pubsub.PubSub + + cfg *Config + log logger.Logger + + // global connections map + connections sync.Map + storage *storage.Storage + + // GO workers pool + workersPool *pool.WorkersPool + + wsUpgrade *websocket.Upgrader + serveExit chan struct{} + + phpPool phpPool.Pool + server server.Server + + // function used to validate access to the requested resource + accessValidator validator.AccessValidatorFn +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { + const op = errors.Op("websockets_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, err) + } + + p.cfg.InitDefault() + + p.pubsubs = make(map[string]pubsub.PubSub) + p.log = log + p.storage = storage.NewStorage() + p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log) + p.wsUpgrade = &websocket.Upgrader{ + HandshakeTimeout: time.Second * 60, + } + p.serveExit = make(chan struct{}) + p.server = server + + return nil +} + +func (p *Plugin) Serve() chan error { + errCh := make(chan error) + + go func() { + var err error + p.Lock() + defer p.Unlock() + + p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, map[string]string{"RR_MODE": "http"}) + if err != nil { + errCh <- err + } + + p.accessValidator = p.defaultAccessValidator(p.phpPool) + }() + + // run all pubsubs drivers + for _, v := range p.pubsubs { + go func(ps pubsub.PubSub) { + for { + select { + case <-p.serveExit: + return + default: + data, err := ps.Next() + if err != nil { + errCh <- err + return + } + p.workersPool.Queue(data) + } + } + }(v) + } + + return errCh +} + +func (p *Plugin) Stop() error { + // close workers pool + p.workersPool.Stop() + p.Lock() + p.phpPool.Destroy(context.Background()) + p.Unlock() + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.GetPublishers, + } +} + +func (p *Plugin) Available() {} + +func (p *Plugin) RPC() interface{} { + return &rpc{ + plugin: p, + log: p.log, + } +} + +func (p *Plugin) Name() string { + return PluginName +} + +// GetPublishers collects all pubsubs +func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) { + p.pubsubs[name.Name()] = pub +} + +func (p *Plugin) Middleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != p.cfg.Path { + next.ServeHTTP(w, r) + return + } + + // we need to lock here, because accessValidator might not be set in the Serve func at the moment + p.RLock() + // before we hijacked connection, we still can write to the response headers + val, err := p.accessValidator(r) + p.RUnlock() + if err != nil { + p.log.Error("validation error") + w.WriteHeader(400) + return + } + + if val.Status != http.StatusOK { + for k, v := range val.Header { + for i := 0; i < len(v); i++ { + w.Header().Add(k, v[i]) + } + } + w.WriteHeader(val.Status) + _, _ = w.Write(val.Body) + return + } + + // upgrade connection to websocket connection + _conn, err := p.wsUpgrade.Upgrade(w, r, nil) + if err != nil { + // connection hijacked, do not use response.writer or request + p.log.Error("upgrade connection", "error", err) + return + } + + // construct safe connection protected by mutexes + safeConn := connection.NewConnection(_conn, p.log) + // generate UUID from the connection + connectionID := uuid.NewString() + // store connection + p.connections.Store(connectionID, safeConn) + + defer func() { + // close the connection on exit + err = safeConn.Close() + if err != nil { + p.log.Error("connection close", "error", err) + } + + // when exiting - delete the connection + p.connections.Delete(connectionID) + }() + + // Executor wraps a connection to have a safe abstraction + e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.accessValidator, r) + p.log.Info("websocket client connected", "uuid", connectionID) + defer e.CleanUp() + + err = e.StartCommandLoop() + if err != nil { + p.log.Error("command loop error, disconnecting", "error", err.Error()) + return + } + + p.log.Info("disconnected", "connectionID", connectionID) + }) +} + +// Workers returns slice with the process states for the workers +func (p *Plugin) Workers() []process.State { + p.RLock() + defer p.RUnlock() + + workers := p.workers() + + ps := make([]process.State, 0, len(workers)) + for i := 0; i < len(workers); i++ { + state, err := process.WorkerProcessState(workers[i]) + if err != nil { + return nil + } + ps = append(ps, state) + } + + return ps +} + +// internal +func (p *Plugin) workers() []worker.BaseProcess { + return p.phpPool.Workers() +} + +// Reset destroys the old pool and replaces it with new one, waiting for old pool to die +func (p *Plugin) Reset() error { + p.Lock() + defer p.Unlock() + const op = errors.Op("ws_plugin_reset") + p.log.Info("WS plugin got restart request. Restarting...") + p.phpPool.Destroy(context.Background()) + p.phpPool = nil + + var err error + p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, map[string]string{"RR_MODE": "http"}) + if err != nil { + return errors.E(op, err) + } + + // attach validators + p.accessValidator = p.defaultAccessValidator(p.phpPool) + + p.log.Info("WS plugin successfully restarted") + return nil +} + +// Publish is an entry point to the websocket PUBSUB +func (p *Plugin) Publish(msg []*pubsub.Message) error { + p.Lock() + defer p.Unlock() + + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics); j++ { + if br, ok := p.pubsubs[msg[i].Broker]; ok { + err := br.Publish(msg) + if err != nil { + return errors.E(err) + } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker) + } + } + } + return nil +} + +func (p *Plugin) PublishAsync(msg []*pubsub.Message) { + go func() { + p.Lock() + defer p.Unlock() + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics); j++ { + err := p.pubsubs[msg[i].Broker].Publish(msg) + if err != nil { + p.log.Error("publish async error", "error", err) + return + } + } + } + }() +} + +func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { + return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) { + p.RLock() + defer p.RUnlock() + const op = errors.Op("access_validator") + + p.log.Debug("validation", "topics", topics) + r = attributes.Init(r) + + // if channels len is eq to 0, we use serverValidator + if len(topics) == 0 { + ctx, err := validator.ServerAccessValidator(r) + if err != nil { + return nil, errors.E(op, err) + } + + val, err := exec(ctx, pool) + if err != nil { + return nil, errors.E(err) + } + + return val, nil + } + + ctx, err := validator.TopicsAccessValidator(r, topics...) + if err != nil { + return nil, errors.E(op, err) + } + + val, err := exec(ctx, pool) + if err != nil { + return nil, errors.E(op) + } + + if val.Status != http.StatusOK { + return val, errors.E(op, errors.Errorf("access forbidden, code: %d", val.Status)) + } + + return val, nil + } +} + +// go:inline +func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) { + const op = errors.Op("exec") + pd := payload.Payload{ + Context: ctx, + } + + resp, err := pool.Exec(pd) + if err != nil { + return nil, errors.E(op, err) + } + + val := &validator.AccessValidator{ + Body: resp.Body, + } + + err = json.Unmarshal(resp.Context, val) + if err != nil { + return nil, errors.E(op, err) + } + + return val, nil +} diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go new file mode 100644 index 00000000..8f18580f --- /dev/null +++ b/plugins/websockets/pool/workers_pool.go @@ -0,0 +1,117 @@ +package pool + +import ( + "sync" + + "github.com/fasthttp/websocket" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" +) + +type WorkersPool struct { + storage *storage.Storage + connections *sync.Map + resPool sync.Pool + log logger.Logger + + queue chan *pubsub.Message + exit chan struct{} +} + +// NewWorkersPool constructs worker pool for the websocket connections +func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { + wp := &WorkersPool{ + connections: connections, + queue: make(chan *pubsub.Message, 100), + storage: storage, + log: log, + exit: make(chan struct{}), + } + + wp.resPool.New = func() interface{} { + return make(map[string]struct{}, 10) + } + + // start 10 workers + for i := 0; i < 10; i++ { + wp.do() + } + + return wp +} + +func (wp *WorkersPool) Queue(msg *pubsub.Message) { + wp.queue <- msg +} + +func (wp *WorkersPool) Stop() { + for i := 0; i < 10; i++ { + wp.exit <- struct{}{} + } + + close(wp.exit) +} + +func (wp *WorkersPool) put(res map[string]struct{}) { + // optimized + // https://go-review.googlesource.com/c/go/+/110055/ + // not O(n), but O(1) + for k := range res { + delete(res, k) + } +} + +func (wp *WorkersPool) get() map[string]struct{} { + return wp.resPool.Get().(map[string]struct{}) +} + +func (wp *WorkersPool) do() { //nolint:gocognit + go func() { + for { + select { + case msg, ok := <-wp.queue: + if !ok { + return + } + // do not handle nil's + if msg == nil { + continue + } + if len(msg.Topics) == 0 { + continue + } + res := wp.get() + // get connections for the particular topic + wp.storage.GetByPtr(msg.Topics, res) + if len(res) == 0 { + wp.log.Info("no such topic", "topic", msg.Topics) + wp.put(res) + continue + } + + for i := range res { + c, ok := wp.connections.Load(i) + if !ok { + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics) + continue + } + + conn := c.(*connection.Connection) + err := conn.Write(websocket.BinaryMessage, msg.Payload) + if err != nil { + wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) + wp.put(res) + continue + } + } + + wp.put(res) + case <-wp.exit: + wp.log.Info("get exit signal, exiting from the workers pool") + return + } + } + }() +} diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go new file mode 100644 index 00000000..2fb0f1b9 --- /dev/null +++ b/plugins/websockets/rpc.go @@ -0,0 +1,47 @@ +package websockets + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// rpc collectors struct +type rpc struct { + plugin *Plugin + log logger.Logger +} + +func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { + const op = errors.Op("broadcast_publish") + r.log.Debug("message published", "msg", msg) + + // just return in case of nil message + if msg == nil { + *ok = true + return nil + } + + err := r.plugin.Publish(msg) + if err != nil { + *ok = false + return errors.E(op, err) + } + *ok = true + return nil +} + +func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error { + r.log.Debug("message published", "msg", msg) + + // just return in case of nil message + if msg == nil { + *ok = true + return nil + } + // publish to the registered broker + r.plugin.PublishAsync(msg) + + *ok = true + return nil +} diff --git a/plugins/websockets/schema/message.fbs b/plugins/websockets/schema/message.fbs new file mode 100644 index 00000000..f2d92c78 --- /dev/null +++ b/plugins/websockets/schema/message.fbs @@ -0,0 +1,10 @@ +namespace message; + +table Message { + command:string; + broker:string; + topics:[string]; + payload:[byte]; +} + +root_type Message; diff --git a/plugins/websockets/schema/message/Message.go b/plugins/websockets/schema/message/Message.go new file mode 100644 index 00000000..26bbd12c --- /dev/null +++ b/plugins/websockets/schema/message/Message.go @@ -0,0 +1,118 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Message struct { + _tab flatbuffers.Table +} + +func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Message{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Message{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Message) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Message) Command() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Message) Broker() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Message) Topics(j int) []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) + } + return nil +} + +func (rcv *Message) TopicsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Message) Payload(j int) int8 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Message) PayloadLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Message) MutatePayload(j int, n int8) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func MessageStart(builder *flatbuffers.Builder) { + builder.StartObject(4) +} +func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0) +} +func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0) +} +func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0) +} +func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0) +} +func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go new file mode 100644 index 00000000..ac256be2 --- /dev/null +++ b/plugins/websockets/storage/storage.go @@ -0,0 +1,79 @@ +package storage + +import ( + "sync" + + "github.com/spiral/roadrunner/v2/pkg/bst" +) + +type Storage struct { + sync.RWMutex + BST bst.Storage +} + +func NewStorage() *Storage { + return &Storage{ + BST: bst.NewBST(), + } +} + +func (s *Storage) InsertMany(connID string, topics []string) { + s.Lock() + defer s.Unlock() + + for i := 0; i < len(topics); i++ { + s.BST.Insert(connID, topics[i]) + } +} + +func (s *Storage) Insert(connID string, topic string) { + s.Lock() + defer s.Unlock() + + s.BST.Insert(connID, topic) +} + +func (s *Storage) RemoveMany(connID string, topics []string) { + s.Lock() + defer s.Unlock() + + for i := 0; i < len(topics); i++ { + s.BST.Remove(connID, topics[i]) + } +} + +func (s *Storage) Remove(connID string, topic string) { + s.Lock() + defer s.Unlock() + + s.BST.Remove(connID, topic) +} + +// GetByPtrTS Thread safe get +func (s *Storage) GetByPtrTS(topics []string, res map[string]struct{}) { + s.Lock() + defer s.Unlock() + + for i := 0; i < len(topics); i++ { + d := s.BST.Get(topics[i]) + if len(d) > 0 { + for ii := range d { + res[ii] = struct{}{} + } + } + } +} + +func (s *Storage) GetByPtr(topics []string, res map[string]struct{}) { + s.RLock() + defer s.RUnlock() + + for i := 0; i < len(topics); i++ { + d := s.BST.Get(topics[i]) + if len(d) > 0 { + for ii := range d { + res[ii] = struct{}{} + } + } + } +} diff --git a/plugins/websockets/storage/storage_test.go b/plugins/websockets/storage/storage_test.go new file mode 100644 index 00000000..4072992a --- /dev/null +++ b/plugins/websockets/storage/storage_test.go @@ -0,0 +1,299 @@ +package storage + +import ( + "math/rand" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +const predifined = "chat-1-2" + +func TestNewBST(t *testing.T) { + // create a new bst + g := NewStorage() + + for i := 0; i < 100; i++ { + g.InsertMany(uuid.NewString(), []string{"comments"}) + } + + for i := 0; i < 100; i++ { + g.InsertMany(uuid.NewString(), []string{"comments2"}) + } + + for i := 0; i < 100; i++ { + g.InsertMany(uuid.NewString(), []string{"comments3"}) + } + + res := make(map[string]struct{}, 100) + assert.Len(t, res, 0) + + // should be 100 + g.GetByPtr([]string{"comments"}, res) + assert.Len(t, res, 100) + + res = make(map[string]struct{}, 100) + assert.Len(t, res, 0) + + // should be 100 + g.GetByPtr([]string{"comments2"}, res) + assert.Len(t, res, 100) + + res = make(map[string]struct{}, 100) + assert.Len(t, res, 0) + + // should be 100 + g.GetByPtr([]string{"comments3"}, res) + assert.Len(t, res, 100) +} + +func BenchmarkGraph(b *testing.B) { + g := NewStorage() + + for i := 0; i < 1000; i++ { + uid := uuid.New().String() + g.InsertMany(uuid.NewString(), []string{uid}) + } + + g.Insert(uuid.NewString(), predifined) + + b.ResetTimer() + b.ReportAllocs() + + res := make(map[string]struct{}) + + for i := 0; i < b.N; i++ { + g.GetByPtr([]string{predifined}, res) + + for i := range res { + delete(res, i) + } + } +} + +func BenchmarkBigSearch(b *testing.B) { + g1 := NewStorage() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + b.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + b.ResetTimer() + b.ReportAllocs() + + res := make(map[string]struct{}, 333) + + for i := 0; i < b.N; i++ { + g1.GetByPtr(predefinedSlice, res) + + for i := range res { + delete(res, i) + } + } +} + +func BenchmarkBigSearchWithRemoves(b *testing.B) { + g1 := NewStorage() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + b.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + b.ResetTimer() + b.ReportAllocs() + + go func() { + tt := time.NewTicker(time.Microsecond) + + res := make(map[string]struct{}, 1000) + for { + select { + case <-tt.C: + num := rand.Intn(1000) //nolint:gosec + g1.GetByPtr(predefinedSlice, res) + for k := range res { + g1.Remove(k, predefinedSlice[num]) + } + } + } + }() + + res := make(map[string]struct{}, 100) + + for i := 0; i < b.N; i++ { + g1.GetByPtr(predefinedSlice, res) + + for i := range res { + delete(res, i) + } + } +} + +func TestBigSearchWithRemoves(t *testing.T) { + g1 := NewStorage() + + predefinedSlice := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + predefinedSlice = append(predefinedSlice, uuid.NewString()) + } + if predefinedSlice == nil { + t.FailNow() + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), uuid.NewString()) + } + + for i := 0; i < 1000; i++ { + g1.Insert(uuid.NewString(), predefinedSlice[i]) + } + + stopCh := make(chan struct{}) + + go func() { + tt := time.NewTicker(time.Microsecond) + + res := make(map[string]struct{}, 1000) + for { + select { + case <-tt.C: + num := rand.Intn(1000) //nolint:gosec + g1.GetByPtr(predefinedSlice, res) + for k := range res { + g1.Remove(k, predefinedSlice[num]) + } + + case <-stopCh: + tt.Stop() + return + } + } + }() + + res := make(map[string]struct{}, 100) + + for i := 0; i < 1000; i++ { + g1.GetByPtr(predefinedSlice, res) + + for i := range res { + delete(res, i) + } + } + + stopCh <- struct{}{} +} + +func TestGraph(t *testing.T) { + g := NewStorage() + + for i := 0; i < 1000; i++ { + uid := uuid.New().String() + g.Insert(uuid.NewString(), uid) + } + + g.Insert(uuid.NewString(), predifined) + + res := make(map[string]struct{}) + + g.GetByPtr([]string{predifined}, res) + assert.NotEmpty(t, res) + assert.Len(t, res, 1) +} + +func TestTreeConcurrentContains(t *testing.T) { + g := NewStorage() + + key1 := uuid.NewString() + key2 := uuid.NewString() + key3 := uuid.NewString() + key4 := uuid.NewString() + key5 := uuid.NewString() + + g.Insert(key1, predifined) + g.Insert(key2, predifined) + g.Insert(key3, predifined) + g.Insert(key4, predifined) + g.Insert(key5, predifined) + + res := make(map[string]struct{}, 100) + + for i := 0; i < 100; i++ { + go func() { + g.GetByPtrTS([]string{predifined}, res) + }() + + go func() { + g.GetByPtrTS([]string{predifined}, res) + }() + + go func() { + g.GetByPtrTS([]string{predifined}, res) + }() + + go func() { + g.GetByPtrTS([]string{predifined}, res) + }() + } + + time.Sleep(time.Second * 5) + + res2 := make(map[string]struct{}, 5) + + g.GetByPtr([]string{predifined}, res2) + assert.NotEmpty(t, res2) + assert.Len(t, res2, 5) +} + +func TestGraphRemove(t *testing.T) { + g := NewStorage() + + key1 := uuid.NewString() + key2 := uuid.NewString() + key3 := uuid.NewString() + key4 := uuid.NewString() + key5 := uuid.NewString() + + g.Insert(key1, predifined) + g.Insert(key2, predifined) + g.Insert(key3, predifined) + g.Insert(key4, predifined) + g.Insert(key5, predifined) + + res := make(map[string]struct{}, 5) + g.GetByPtr([]string{predifined}, res) + assert.NotEmpty(t, res) + assert.Len(t, res, 5) + + g.Remove(key1, predifined) + + res2 := make(map[string]struct{}, 4) + g.GetByPtr([]string{predifined}, res2) + assert.NotEmpty(t, res2) + assert.Len(t, res2, 4) +} diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go new file mode 100644 index 00000000..e666f846 --- /dev/null +++ b/plugins/websockets/validator/access_validator.go @@ -0,0 +1,76 @@ +package validator + +import ( + "net/http" + "strings" + + json "github.com/json-iterator/go" + "github.com/spiral/errors" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" + "github.com/spiral/roadrunner/v2/plugins/http/attributes" +) + +type AccessValidatorFn = func(r *http.Request, channels ...string) (*AccessValidator, error) + +type AccessValidator struct { + Header http.Header `json:"headers"` + Status int `json:"status"` + Body []byte +} + +func ServerAccessValidator(r *http.Request) ([]byte, error) { + const op = errors.Op("server_access_validator") + + err := attributes.Set(r, "ws:joinServer", true) + if err != nil { + return nil, errors.E(op, err) + } + + defer delete(attributes.All(r), "ws:joinServer") + + req := &handler.Request{ + RemoteAddr: handler.FetchIP(r.RemoteAddr), + Protocol: r.Proto, + Method: r.Method, + URI: handler.URI(r), + Header: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + Attributes: attributes.All(r), + } + + data, err := json.Marshal(req) + if err != nil { + return nil, errors.E(op, err) + } + + return data, nil +} + +func TopicsAccessValidator(r *http.Request, topics ...string) ([]byte, error) { + const op = errors.Op("topic_access_validator") + err := attributes.Set(r, "ws:joinTopics", strings.Join(topics, ",")) + if err != nil { + return nil, errors.E(op, err) + } + + defer delete(attributes.All(r), "ws:joinTopics") + + req := &handler.Request{ + RemoteAddr: handler.FetchIP(r.RemoteAddr), + Protocol: r.Proto, + Method: r.Method, + URI: handler.URI(r), + Header: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + Attributes: attributes.All(r), + } + + data, err := json.Marshal(req) + if err != nil { + return nil, errors.E(op, err) + } + + return data, nil +} diff --git a/tests/Dockerfile b/tests/Dockerfile new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/Dockerfile diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 575fe656..f6533dc4 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -12,8 +12,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" "github.com/stretchr/testify/assert" "net/http" diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go index 15c82839..d75620f3 100644 --- a/tests/plugins/http/parse_test.go +++ b/tests/plugins/http/parse_test.go @@ -3,7 +3,7 @@ package http import ( "testing" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" ) var samples = []struct { diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go index 3564d9cd..276c22ef 100644 --- a/tests/plugins/http/response_test.go +++ b/tests/plugins/http/response_test.go @@ -7,7 +7,7 @@ import ( "testing" "github.com/spiral/roadrunner/v2/pkg/payload" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/stretchr/testify/assert" ) diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index 5c39589c..903a930a 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -18,8 +18,8 @@ import ( j "github.com/json-iterator/go" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" "github.com/stretchr/testify/assert" ) diff --git a/tests/plugins/informer/.rr-informer.yaml b/tests/plugins/informer/.rr-informer.yaml index e1edbb44..94c9a856 100644 --- a/tests/plugins/informer/.rr-informer.yaml +++ b/tests/plugins/informer/.rr-informer.yaml @@ -3,8 +3,8 @@ server: user: "" group: "" env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" + - RR_CONFIG: "/some/place/on/the/C134" + - RR_CONFIG: "C138" relay: "pipes" relay_timeout: "20s" @@ -12,4 +12,4 @@ rpc: listen: tcp://127.0.0.1:6001 logs: mode: development - level: error
\ No newline at end of file + level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml new file mode 100644 index 00000000..dc073be3 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -0,0 +1,39 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:11111 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + # pubsubs should implement PubSub interface to be collected via endure.Collects + # pubsubs might use general config section or its own + pubsubs: [ "redis" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml new file mode 100644 index 00000000..896cee05 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../worker-ok.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:11113 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + pubsubs: [ "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml new file mode 100644 index 00000000..e3bf5218 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../worker-deny.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:11112 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + pubsubs: [ "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml new file mode 100644 index 00000000..0614f4e7 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml @@ -0,0 +1,37 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../worker-stop.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:11114 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + pubsubs: [ "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml new file mode 100644 index 00000000..eedf5377 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml @@ -0,0 +1,39 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:13235 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + # pubsubs should implement PubSub interface to be collected via endure.Collects + # pubsubs might use general config section + pubsubs: [ "redis", "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go new file mode 100644 index 00000000..772b53ac --- /dev/null +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -0,0 +1,885 @@ +package websockets + +import ( + "net" + "net/http" + "net/rpc" + "net/url" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/fasthttp/websocket" + json "github.com/json-iterator/go" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" + httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory" + "github.com/spiral/roadrunner/v2/plugins/redis" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/websockets" + "github.com/spiral/roadrunner/v2/utils" + "github.com/stretchr/testify/assert" +) + +type Msg struct { + // Topic message been pushed into. + Topics []string `json:"topic"` + + // Command (join, leave, headers) + Command string `json:"command"` + + // Broker (redis, memory) + Broker string `json:"broker"` + + // Payload to be broadcasted + Payload []byte `json:"payload"` +} + +func TestBroadcastInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("TestWSInit", wsInit) + + stopCh <- struct{}{} + + wg.Wait() +} + +func wsInit(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func TestWSRedisAndMemory(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-redis-memory.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemory", RPCWsMemory) + t.Run("RPCWsRedis", RPCWsRedis) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) + + stopCh <- struct{}{} + + wg.Wait() +} + +func RPCWsMemoryPubAsync(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publishAsync("", "memory", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publishAsync("", "memory", "foo") + + go func() { + time.Sleep(time.Second * 5) + publishAsync2("", "memory", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func RPCWsMemory(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publish("", "memory", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publish("", "memory", "foo") + + go func() { + time.Sleep(time.Second * 5) + publish2("", "memory", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func RPCWsRedis(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + d, err := json.Marshal(message("join", "redis", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publish("", "redis", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "redis", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publish("", "redis", "foo") + + go func() { + time.Sleep(time.Second * 5) + publish2("", "redis", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func TestWSMemoryDeny(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-memory-deny.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryDeny", RPCWsMemoryDeny) + + stopCh <- struct{}{} + + wg.Wait() +} + +func RPCWsMemoryDeny(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11112", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + assert.NotNil(t, c) + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + + defer func() { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func TestWSMemoryStop(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-memory-stop.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryStop", RPCWsMemoryStop) + + stopCh <- struct{}{} + + wg.Wait() +} + +func RPCWsMemoryStop(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11114", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NotNil(t, resp) + assert.Error(t, err) + assert.Nil(t, c) + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) //nolint:staticcheck + assert.Equal(t, resp.Header.Get("Stop"), "we-dont-like-you") //nolint:staticcheck + if resp != nil && resp.Body != nil { //nolint:staticcheck + _ = resp.Body.Close() + } +} + +func TestWSMemoryOk(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-memory-allow.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryAllow", RPCWsMemoryAllow) + + stopCh <- struct{}{} + + wg.Wait() +} + +func RPCWsMemoryAllow(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11113", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + assert.NotNil(t, c) + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + + defer func() { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + }() + + d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + publish("", "memory", "foo") + + // VERIFY a message + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP", retMsg) + + // //// LEAVE foo, foo2 ///////// + d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg) + + // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC + publish("", "memory", "foo") + + go func() { + time.Sleep(time.Second * 5) + publish2("", "memory", "foo2") + }() + + // should be only message from the subscribed foo2 topic + _, msg, err = c.ReadMessage() + retMsg = utils.AsString(msg) + assert.NoError(t, err) + assert.Equal(t, "hello, PHP2", retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func publish(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + +func publishAsync(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + +func publishAsync2(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + +func publish2(command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + var ret bool + err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret) + if err != nil { + panic(err) + } +} + +func message(command string, broker string, payload []byte, topics ...string) *Msg { + return &Msg{ + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, + } +} diff --git a/tests/worker-deny.php b/tests/worker-deny.php new file mode 100644 index 00000000..6dc993f6 --- /dev/null +++ b/tests/worker-deny.php @@ -0,0 +1,30 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ +use Spiral\Goridge; +use Spiral\RoadRunner; + +ini_set('display_errors', 'stderr'); +require __DIR__ . "/vendor/autoload.php"; + +$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$psr7 = new RoadRunner\Http\PSR7Worker( + $worker, + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); + +while ($req = $psr7->waitRequest()) { + try { + $resp = new \Nyholm\Psr7\Response(); + if ($req->getAttribute('ws:joinServer')) { + $psr7->respond($resp->withStatus(200)); + } else { + $psr7->respond($resp->withStatus(401)); + } + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} diff --git a/tests/worker-ok.php b/tests/worker-ok.php new file mode 100644 index 00000000..63558b0f --- /dev/null +++ b/tests/worker-ok.php @@ -0,0 +1,27 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ +use Spiral\Goridge; +use Spiral\RoadRunner; + +ini_set('display_errors', 'stderr'); +require __DIR__ . "/vendor/autoload.php"; + +$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$psr7 = new RoadRunner\Http\PSR7Worker( + $worker, + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); + +while ($req = $psr7->waitRequest()) { + try { + $resp = new \Nyholm\Psr7\Response(); + $resp->getBody()->write($_SERVER['RR_BROADCAST_PATH'] ?? ''); + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} diff --git a/tests/worker-stop.php b/tests/worker-stop.php new file mode 100644 index 00000000..83fc5710 --- /dev/null +++ b/tests/worker-stop.php @@ -0,0 +1,26 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ +use Spiral\Goridge; +use Spiral\RoadRunner; + +ini_set('display_errors', 'stderr'); +require __DIR__ . "/vendor/autoload.php"; + +$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$psr7 = new RoadRunner\Http\PSR7Worker( + $worker, + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); + +while ($req = $psr7->waitRequest()) { + try { + $resp = new \Nyholm\Psr7\Response(); + $psr7->respond($resp->withAddedHeader('stop', 'we-dont-like-you')->withStatus(401)); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} diff --git a/utils/network.go b/utils/network.go index b73363db..86a7e733 100755 --- a/utils/network.go +++ b/utils/network.go @@ -12,7 +12,8 @@ import ( "github.com/valyala/tcplisten" ) -// - SO_REUSEPORT. This option allows linear scaling server performance +// CreateListener +// - SO_REUSEPORT. This option allows linear scaling server performance // on multi-CPU servers. // See https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ for details. // |