summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linters.yml2
-rw-r--r--.github/workflows/linux.yml3
-rwxr-xr-x.gitignore1
-rwxr-xr-x.golangci.yml9
-rwxr-xr-xMakefile6
-rw-r--r--codecov.yml21
-rw-r--r--go.mod9
-rw-r--r--go.sum24
-rw-r--r--pkg/bst/bst.go136
-rw-r--r--pkg/bst/bst_test.go394
-rw-r--r--pkg/bst/doc.go7
-rw-r--r--pkg/bst/interface.go11
-rw-r--r--pkg/pool/interface.go2
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/pubsub/interface.go32
-rw-r--r--pkg/pubsub/message.go24
-rw-r--r--pkg/worker_handler/constants.go (renamed from plugins/http/worker_handler/constants.go)0
-rw-r--r--pkg/worker_handler/errors.go (renamed from plugins/http/worker_handler/errors.go)0
-rw-r--r--pkg/worker_handler/errors_windows.go (renamed from plugins/http/worker_handler/errors_windows.go)0
-rw-r--r--pkg/worker_handler/handler.go (renamed from plugins/http/worker_handler/handler.go)8
-rw-r--r--pkg/worker_handler/parse.go (renamed from plugins/http/worker_handler/parse.go)0
-rw-r--r--pkg/worker_handler/request.go (renamed from plugins/http/worker_handler/request.go)10
-rw-r--r--pkg/worker_handler/response.go (renamed from plugins/http/worker_handler/response.go)0
-rw-r--r--pkg/worker_handler/uploads.go (renamed from plugins/http/worker_handler/uploads.go)0
-rw-r--r--pkg/worker_watcher/interface.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go6
-rw-r--r--plugins/config/interface.go4
-rw-r--r--plugins/http/config/http.go6
-rw-r--r--plugins/http/plugin.go243
-rw-r--r--plugins/http/serve.go76
-rw-r--r--plugins/kv/drivers/boltdb/driver.go3
-rw-r--r--plugins/kv/drivers/memcached/driver.go3
-rw-r--r--plugins/kv/drivers/memory/driver.go3
-rw-r--r--plugins/kv/drivers/redis/plugin.go2
-rw-r--r--plugins/kv/storage.go4
-rw-r--r--plugins/logger/interface.go2
-rw-r--r--plugins/logger/std_log_adapter.go9
-rw-r--r--plugins/memory/plugin.go81
-rw-r--r--plugins/redis/fanin.go100
-rw-r--r--plugins/redis/plugin.go134
-rw-r--r--plugins/reload/watcher.go2
-rw-r--r--plugins/server/interface.go3
-rw-r--r--plugins/server/plugin.go4
-rw-r--r--plugins/websockets/commands/enums.go9
-rw-r--r--plugins/websockets/config.go58
-rw-r--r--plugins/websockets/connection/connection.go67
-rw-r--r--plugins/websockets/doc/broadcast.drawio1
-rw-r--r--plugins/websockets/doc/doc.go27
-rw-r--r--plugins/websockets/executor/executor.go226
-rw-r--r--plugins/websockets/plugin.go386
-rw-r--r--plugins/websockets/pool/workers_pool.go117
-rw-r--r--plugins/websockets/rpc.go47
-rw-r--r--plugins/websockets/schema/message.fbs10
-rw-r--r--plugins/websockets/schema/message/Message.go118
-rw-r--r--plugins/websockets/storage/storage.go79
-rw-r--r--plugins/websockets/storage/storage_test.go299
-rw-r--r--plugins/websockets/validator/access_validator.go76
-rw-r--r--tests/Dockerfile0
-rw-r--r--tests/plugins/http/handler_test.go2
-rw-r--r--tests/plugins/http/parse_test.go2
-rw-r--r--tests/plugins/http/response_test.go2
-rw-r--r--tests/plugins/http/uploads_test.go2
-rw-r--r--tests/plugins/informer/.rr-informer.yaml6
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml39
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml37
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml37
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml37
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml39
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go885
-rw-r--r--tests/worker-deny.php30
-rw-r--r--tests/worker-ok.php27
-rw-r--r--tests/worker-stop.php26
-rwxr-xr-xutils/network.go3
74 files changed, 3831 insertions, 259 deletions
diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml
index 82072675..cee7085c 100644
--- a/.github/workflows/linters.yml
+++ b/.github/workflows/linters.yml
@@ -13,6 +13,6 @@ jobs:
- name: Run linter
uses: golangci/golangci-lint-action@v2 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
- version: v1.39 # without patch version
+ version: v1.40 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 89173b3f..a8f97d12 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -60,8 +60,10 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/transport/socket
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/storage_ws.txt -covermode=atomic ./plugins/websockets/storage
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload
@@ -77,6 +79,7 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
docker-compose -f ./tests/docker-compose.yaml down
cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt
diff --git a/.gitignore b/.gitignore
index 9a9a07b6..beb3f885 100755
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ tests/vendor/
.rr-sample.yaml
cmd
rr
+**/old
diff --git a/.golangci.yml b/.golangci.yml
index bfc69f57..41e1d68f 100755
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -14,8 +14,10 @@ output:
linters-settings:
govet:
check-shadowing: true
- golint:
- min-confidence: 0.1
+ revive:
+ confidence: 0.8
+ errorCode: 0
+ warningCode: 0
gocyclo:
min-complexity: 15
godot:
@@ -55,7 +57,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- gocritic # The most opinionated Go source code linter
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
- - golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
+ - revive # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
- goprintffuncname # Checks that printf-like functions are named with `f` at the end
- gosec # Inspects source code for security problems
- gosimple # Linter for Go source code that specializes in simplifying a code
@@ -88,3 +90,4 @@ issues:
- goconst
- noctx
- gosimple
+ - revive
diff --git a/Makefile b/Makefile
index 6ff605ab..32f3839c 100755
--- a/Makefile
+++ b/Makefile
@@ -13,7 +13,9 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pool.out -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/storage-ws.out -covermode=atomic ./plugins/websockets/storage
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload
@@ -29,6 +31,7 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/resetter.out -covermode=atomic ./tests/plugins/resetter
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/rpc.out -covermode=atomic ./tests/plugins/rpc
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/kv_plugin.out -covermode=atomic ./tests/plugins/kv
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/ws_plugin.out -covermode=atomic ./tests/plugins/websockets
cat ./coverage/*.out > ./coverage/summary.out
docker-compose -f tests/docker-compose.yaml down
@@ -39,7 +42,9 @@ test: ## Run application tests
go test -v -race -tags=debug ./pkg/pool
go test -v -race -tags=debug ./pkg/worker
go test -v -race -tags=debug ./pkg/worker_watcher
+ go test -v -race -tags=debug ./pkg/bst
go test -v -race -tags=debug ./tests/plugins/http
+ go test -v -race -tags=debug ./plugins/websockets/storage
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./tests/plugins/informer
go test -v -race -tags=debug ./tests/plugins/reload
@@ -55,4 +60,5 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/resetter
go test -v -race -tags=debug ./tests/plugins/rpc
go test -v -race -tags=debug ./tests/plugins/kv
+ go test -v -race -tags=debug ./tests/plugins/websockets
docker-compose -f tests/docker-compose.yaml down
diff --git a/codecov.yml b/codecov.yml
index 43716f56..da01f6f7 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -14,10 +14,21 @@ coverage:
# do not include tests folders
ignore:
- "tests"
- - "plugins/kv/boltdb/plugin_unit_test.go"
- - "plugins/kv/memcached/plugin_unit_test.go"
- - "plugins/kv/memory/plugin_unit_test.go"
+ - "plugins/metrics/config_test.go"
+ - "plugins/websockets/storage/storage_test.go"
+ - "pkg/bst/bst_test.go"
+ - "pkg/doc"
+ - "pkg/pool/static_pool_test.go"
+ - "pkg/pool/supervisor_test.go"
+ - "pkg/pubsub"
+ - "pkg/transport/pipe/pipe_factory_spawn_test.go"
+ - "pkg/transport/pipe/pipe_factory_test.go"
+ - "pkg/transport/socket/socket_factory_spawn_test.go"
+ - "pkg/transport/socket/socket_factory_test.go"
+ - "pkg/transport/interface.go"
+ - "pkg/worker/state_test.go"
+ - "pkg/worker/sync_worker_test.go"
+ - "pkg/worker/worker_test.go"
- "pkg/events/pool_events.go"
- "pkg/events/worker_events.go"
- - "interfaces"
- - "systemd" \ No newline at end of file
+ - "systemd"
diff --git a/go.mod b/go.mod
index 72191467..19a9156a 100644
--- a/go.mod
+++ b/go.mod
@@ -8,16 +8,20 @@ require (
github.com/alicebob/miniredis/v2 v2.14.5
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/dustin/go-humanize v1.0.0
+ github.com/fasthttp/websocket v1.4.3
github.com/fatih/color v1.12.0
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-redis/redis/v8 v8.9.0
github.com/gofiber/fiber/v2 v2.10.0
github.com/golang/mock v1.4.4
github.com/google/flatbuffers v1.12.1
+ github.com/google/uuid v1.2.0
github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.11
+ github.com/klauspost/compress v1.12.2 // indirect
github.com/olekukonko/tablewriter v0.0.5
github.com/prometheus/client_golang v1.10.0
+ github.com/savsgio/gotils v0.0.0-20210316171653-c54912823645 // indirect
github.com/shirou/gopsutil v3.21.3+incompatible
github.com/spf13/viper v1.7.1
// SPIRAL ====
@@ -27,12 +31,13 @@ require (
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.4 // indirect
- github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
+ github.com/valyala/fasthttp v1.24.0 // indirect
+ github.com/valyala/tcplisten v1.0.0
github.com/yookoala/gofast v0.6.0
go.etcd.io/bbolt v1.3.5
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.17.0
golang.org/x/net v0.0.0-20210226101413-39120d07d75e
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
- golang.org/x/sys v0.0.0-20210309074719-68d13333faf2
+ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015
)
diff --git a/go.sum b/go.sum
index 7f5c5d57..94b71b07 100644
--- a/go.sum
+++ b/go.sum
@@ -33,6 +33,7 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.14.5 h1:iCFJiSur7871KaFJLAsBEpmc3DJHJ4YuB7W1hYLWs+U=
github.com/alicebob/miniredis/v2 v2.14.5/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I=
+github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc=
github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
@@ -96,6 +97,8 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/fasthttp/websocket v1.4.3 h1:qjhRJ/rTy4KB8oBxljEC00SDt6HUY9jLRfM601SUdS4=
+github.com/fasthttp/websocket v1.4.3/go.mod h1:5r4oKssgS7W6Zn6mPWap3NWzNPJNzUUh3baWTOhcYQk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
@@ -146,6 +149,7 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw=
@@ -163,6 +167,8 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
+github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
@@ -225,9 +231,11 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
-github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8=
+github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@@ -358,6 +366,9 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
+github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVzGOXq8RbEvHnhzA/A2sLZzgn0m6URjnukY8=
+github.com/savsgio/gotils v0.0.0-20210316171653-c54912823645 h1:ug9pfpEqVhvazyl1GezkQ9M/XdWsQn3VSx0s4qfH82I=
+github.com/savsgio/gotils v0.0.0-20210316171653-c54912823645/go.mod h1:CN0/b5o0sSBi9K8fZTUamBG5NZXO0I64vTh9L3Mzhn0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v3.21.3+incompatible h1:uenXGGa8ESCQq+dbgtl916dmg6PSAz2cXov0uORQ9v8=
github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
@@ -415,10 +426,13 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
-github.com/valyala/fasthttp v1.23.0 h1:0ufwSD9BhWa6f8HWdmdq4FHQ23peRo3Ng/Qs8m5NcFs=
+github.com/valyala/fasthttp v1.14.0/go.mod h1:ol1PCaL0dX20wC0htZ7sYCsvCYmrouYra0zHzaclZhE=
github.com/valyala/fasthttp v1.23.0/go.mod h1:0mw2RjXGOzxf4NL2jni3gUQ7LfjjUSiG5sskOUUSEpU=
-github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc=
+github.com/valyala/fasthttp v1.24.0 h1:AAiG4oLDUArTb7rYf9oO2bkGooOqCaUF6a2u8asBP3I=
+github.com/valyala/fasthttp v1.24.0/go.mod h1:0mw2RjXGOzxf4NL2jni3gUQ7LfjjUSiG5sskOUUSEpU=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
+github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
+github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
@@ -512,6 +526,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -570,8 +585,9 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 h1:hZR0X1kPW+nwyJ9xRxqZk1vx5RUObAPBdKVvXPDUH/E=
+golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go
new file mode 100644
index 00000000..664937ba
--- /dev/null
+++ b/pkg/bst/bst.go
@@ -0,0 +1,136 @@
+package bst
+
+// BST ...
+type BST struct {
+ // registered topic, not unique
+ topic string
+ // associated connections with the topic
+ uuids map[string]struct{}
+
+ // left and right subtrees
+ left *BST
+ right *BST
+}
+
+func NewBST() Storage {
+ return &BST{
+ uuids: make(map[string]struct{}, 10),
+ }
+}
+
+// Insert uuid to the topic
+func (b *BST) Insert(uuid string, topic string) {
+ curr := b
+
+ for {
+ if topic == curr.topic {
+ curr.uuids[uuid] = struct{}{}
+ return
+ }
+ // if topic less than curr topic
+ if topic < curr.topic {
+ if curr.left == nil {
+ curr.left = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+ // move forward
+ curr = curr.left
+ } else {
+ if curr.right == nil {
+ curr.right = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+
+ curr = curr.right
+ }
+ }
+}
+
+func (b *BST) Get(topic string) map[string]struct{} {
+ curr := b
+ for curr != nil {
+ switch {
+ case topic < curr.topic:
+ curr = curr.left
+ case topic > curr.topic:
+ curr = curr.right
+ case topic == curr.topic:
+ return curr.uuids
+ }
+ }
+
+ return nil
+}
+
+func (b *BST) Remove(uuid string, topic string) {
+ b.removeHelper(uuid, topic, nil)
+}
+
+func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
+ curr := b
+ for curr != nil {
+ if topic < curr.topic { //nolint:gocritic
+ parent = curr
+ curr = curr.left
+ } else if topic > curr.topic {
+ parent = curr
+ curr = curr.right
+ } else {
+ // if more than 1 topic - remove only topic, do not remove the whole vertex
+ if len(curr.uuids) > 1 {
+ if _, ok := curr.uuids[uuid]; ok {
+ delete(curr.uuids, uuid)
+ return
+ }
+ }
+
+ if curr.left != nil && curr.right != nil { //nolint:gocritic
+ curr.topic, curr.uuids = curr.right.traverseForMinString()
+ curr.right.removeHelper(curr.topic, uuid, curr)
+ } else if parent == nil {
+ if curr.left != nil { //nolint:gocritic
+ curr.topic = curr.left.topic
+ curr.uuids = curr.left.uuids
+
+ curr.right = curr.left.right
+ curr.left = curr.left.left
+ } else if curr.right != nil {
+ curr.topic = curr.right.topic
+ curr.uuids = curr.right.uuids
+
+ curr.left = curr.right.left
+ curr.right = curr.right.right
+ } else { //nolint:staticcheck
+ // single node tree
+ }
+ } else if parent.left == curr {
+ if curr.left != nil {
+ parent.left = curr.left
+ } else {
+ parent.left = curr.right
+ }
+ } else if parent.right == curr {
+ if curr.left != nil {
+ parent.right = curr.left
+ } else {
+ parent.right = curr.right
+ }
+ }
+ break
+ }
+ }
+}
+
+//go:inline
+func (b *BST) traverseForMinString() (string, map[string]struct{}) {
+ if b.left == nil {
+ return b.topic, b.uuids
+ }
+ return b.left.traverseForMinString()
+}
diff --git a/pkg/bst/bst_test.go b/pkg/bst/bst_test.go
new file mode 100644
index 00000000..e4d4e4c3
--- /dev/null
+++ b/pkg/bst/bst_test.go
@@ -0,0 +1,394 @@
+package bst
+
+import (
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+const predifined = "chat-1-2"
+
+func TestNewBST(t *testing.T) {
+ // create a new bst
+ g := NewBST()
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments2")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments3")
+ }
+
+ // should be 100
+ exist := g.Get("comments")
+ assert.Len(t, exist, 100)
+
+ // should be 100
+ exist2 := g.Get("comments2")
+ assert.Len(t, exist2, 100)
+
+ // should be 100
+ exist3 := g.Get("comments3")
+ assert.Len(t, exist3, 100)
+}
+
+func BenchmarkGraph(b *testing.B) {
+ g := NewBST()
+
+ for i := 0; i < 1000; i++ {
+ uid := uuid.New().String()
+ g.Insert(uuid.NewString(), uid)
+ }
+
+ g.Insert(uuid.NewString(), predifined)
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ exist := g.Get(predifined)
+ _ = exist
+ }
+}
+
+func BenchmarkBigSearch(b *testing.B) {
+ g1 := NewBST()
+ g2 := NewBST()
+ g3 := NewBST()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ b.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g2.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g3.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 333; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g2.Insert(uuid.NewString(), predefinedSlice[333+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g3.Insert(uuid.NewString(), predefinedSlice[666+i])
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g1.Get(predefinedSlice[i])
+ _ = exist
+ }
+ }
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g2.Get(predefinedSlice[333+i])
+ _ = exist
+ }
+ }
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g3.Get(predefinedSlice[666+i])
+ _ = exist
+ }
+ }
+}
+
+func BenchmarkBigSearchWithRemoves(b *testing.B) {
+ g1 := NewBST()
+ g2 := NewBST()
+ g3 := NewBST()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ b.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g2.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g3.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 333; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g2.Insert(uuid.NewString(), predefinedSlice[333+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g3.Insert(uuid.NewString(), predefinedSlice[666+i])
+ }
+
+ go func() {
+ tt := time.NewTicker(time.Millisecond)
+ for {
+ select {
+ case <-tt.C:
+ num := rand.Intn(333) //nolint:gosec
+ values := g1.Get(predefinedSlice[num])
+ for k := range values {
+ g1.Remove(k, predefinedSlice[num])
+ }
+ }
+ }
+ }()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g1.Get(predefinedSlice[i])
+ _ = exist
+ }
+ }
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g2.Get(predefinedSlice[333+i])
+ _ = exist
+ }
+ }
+ for i := 0; i < b.N; i++ {
+ for i := 0; i < 333; i++ {
+ exist := g3.Get(predefinedSlice[666+i])
+ _ = exist
+ }
+ }
+}
+
+func TestGraph(t *testing.T) {
+ g := NewBST()
+
+ for i := 0; i < 1000; i++ {
+ uid := uuid.New().String()
+ g.Insert(uuid.NewString(), uid)
+ }
+
+ g.Insert(uuid.NewString(), predifined)
+
+ exist := g.Get(predifined)
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+}
+
+func TestTreeConcurrentContains(t *testing.T) {
+ g := NewBST()
+
+ key1 := uuid.NewString()
+ key2 := uuid.NewString()
+ key3 := uuid.NewString()
+ key4 := uuid.NewString()
+ key5 := uuid.NewString()
+
+ g.Insert(key1, predifined)
+ g.Insert(key2, predifined)
+ g.Insert(key3, predifined)
+ g.Insert(key4, predifined)
+ g.Insert(key5, predifined)
+
+ for i := 0; i < 100; i++ {
+ go func() {
+ _ = g.Get(predifined)
+ }()
+
+ go func() {
+ _ = g.Get(predifined)
+ }()
+
+ go func() {
+ _ = g.Get(predifined)
+ }()
+
+ go func() {
+ _ = g.Get(predifined)
+ }()
+ }
+
+ time.Sleep(time.Second * 2)
+
+ exist := g.Get(predifined)
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 5)
+}
+
+func TestGraphRemove(t *testing.T) {
+ g := NewBST()
+
+ key1 := uuid.NewString()
+ key2 := uuid.NewString()
+ key3 := uuid.NewString()
+ key4 := uuid.NewString()
+ key5 := uuid.NewString()
+
+ g.Insert(key1, predifined)
+ g.Insert(key2, predifined)
+ g.Insert(key3, predifined)
+ g.Insert(key4, predifined)
+ g.Insert(key5, predifined)
+
+ exist := g.Get(predifined)
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 5)
+
+ g.Remove(key1, predifined)
+
+ exist = g.Get(predifined)
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 4)
+}
+
+func TestBigSearch(t *testing.T) {
+ g1 := NewBST()
+ g2 := NewBST()
+ g3 := NewBST()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ t.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g2.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 1000; i++ {
+ g3.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 333; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g2.Insert(uuid.NewString(), predefinedSlice[333+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g3.Insert(uuid.NewString(), predefinedSlice[666+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g1.Get(predefinedSlice[i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g2.Get(predefinedSlice[333+i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g3.Get(predefinedSlice[666+i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+}
+
+func TestBigSearchWithRemoves(t *testing.T) {
+ g1 := NewBST()
+ g2 := NewBST()
+ g3 := NewBST()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ t.FailNow()
+ }
+
+ for i := 0; i < 100000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 100000; i++ {
+ g2.Insert(uuid.NewString(), uuid.NewString())
+ }
+ for i := 0; i < 100000; i++ {
+ g3.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 333; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g2.Insert(uuid.NewString(), predefinedSlice[333+i])
+ }
+
+ for i := 0; i < 333; i++ {
+ g3.Insert(uuid.NewString(), predefinedSlice[666+i])
+ }
+
+ time.Sleep(time.Second * 1)
+ go func() {
+ tt := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-tt.C:
+ num := rand.Intn(333) //nolint:gosec
+ values := g1.Get(predefinedSlice[num])
+ for k := range values {
+ g1.Remove(k, predefinedSlice[num])
+ }
+ }
+ }
+ }()
+
+ for i := 0; i < 333; i++ {
+ exist := g1.Get(predefinedSlice[i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g2.Get(predefinedSlice[333+i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+
+ for i := 0; i < 333; i++ {
+ exist := g3.Get(predefinedSlice[666+i])
+ assert.NotNil(t, exist)
+ assert.Len(t, exist, 1)
+ }
+}
diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go
new file mode 100644
index 00000000..abb7e6e9
--- /dev/null
+++ b/pkg/bst/doc.go
@@ -0,0 +1,7 @@
+package bst
+
+/*
+Binary search tree for the pubsub
+
+The vertex may have one or multiply topics associated with the single websocket connection UUID
+*/
diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go
new file mode 100644
index 00000000..ecf40414
--- /dev/null
+++ b/pkg/bst/interface.go
@@ -0,0 +1,11 @@
+package bst
+
+// Storage is general in-memory BST storage implementation
+type Storage interface {
+ // Insert inserts to a vertex with topic ident connection uuid
+ Insert(uuid string, topic string)
+ // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
+ Remove(uuid, topic string)
+ // Get will return all connections associated with the topic
+ Get(topic string) map[string]struct{}
+}
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index 4ef2f2e7..c22fbbd3 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -18,7 +18,7 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
- // Remove worker from the pool.
+ // RemoveWorker removes worker from the pool.
RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index d57cc95c..b5d97b8b 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -47,7 +47,7 @@ type StaticPool struct {
allocator worker.Allocator
// err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint:golint,stylecheck
+ err_encoder ErrorEncoder //nolint:stylecheck
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -245,7 +245,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (payload.Payload, error) {
- const op = errors.Op("error encoder")
+ const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 40903db3..ca61dbc4 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -15,7 +15,7 @@ import (
const MB = 1024 * 1024
// NSEC_IN_SEC nanoseconds in second
-const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
type Supervised interface {
Pool
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
new file mode 100644
index 00000000..caf8783f
--- /dev/null
+++ b/pkg/pubsub/interface.go
@@ -0,0 +1,32 @@
+package pubsub
+
+// PubSub ...
+type PubSub interface {
+ Publisher
+ Subscriber
+ Reader
+}
+
+// Subscriber defines the ability to operate as message passing broker.
+type Subscriber interface {
+ // Subscribe broker to one or multiple topics.
+ Subscribe(topics ...string) error
+
+ // Unsubscribe from one or multiply topics
+ Unsubscribe(topics ...string) error
+}
+
+// Publisher publish one or more messages
+type Publisher interface {
+ // Publish one or multiple Channel.
+ Publish(messages []*Message) error
+
+ // PublishAsync publish message and return immediately
+ // If error occurred it will be printed into the logger
+ PublishAsync(messages []*Message)
+}
+
+// Reader interface should return next message
+type Reader interface {
+ Next() (*Message, error)
+}
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
new file mode 100644
index 00000000..c17d153b
--- /dev/null
+++ b/pkg/pubsub/message.go
@@ -0,0 +1,24 @@
+package pubsub
+
+import (
+ json "github.com/json-iterator/go"
+)
+
+type Message struct {
+ // Command (join, leave, headers)
+ Command string `json:"command"`
+
+ // Broker (redis, memory)
+ Broker string `json:"broker"`
+
+ // Topic message been pushed into.
+ Topics []string `json:"topic"`
+
+ // Payload to be broadcasted
+ Payload []byte `json:"payload"`
+}
+
+// MarshalBinary needed to marshal message for the redis
+func (m *Message) MarshalBinary() ([]byte, error) {
+ return json.Marshal(m)
+}
diff --git a/plugins/http/worker_handler/constants.go b/pkg/worker_handler/constants.go
index 3355d9c2..3355d9c2 100644
--- a/plugins/http/worker_handler/constants.go
+++ b/pkg/worker_handler/constants.go
diff --git a/plugins/http/worker_handler/errors.go b/pkg/worker_handler/errors.go
index 5fa8e64e..5fa8e64e 100644
--- a/plugins/http/worker_handler/errors.go
+++ b/pkg/worker_handler/errors.go
diff --git a/plugins/http/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
index 390cc7d1..390cc7d1 100644
--- a/plugins/http/worker_handler/errors_windows.go
+++ b/pkg/worker_handler/errors_windows.go
diff --git a/plugins/http/worker_handler/handler.go b/pkg/worker_handler/handler.go
index be53fc12..e0d1aae0 100644
--- a/plugins/http/worker_handler/handler.go
+++ b/pkg/worker_handler/handler.go
@@ -89,7 +89,7 @@ func (h *Handler) AddListener(l events.Listener) {
// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serve_http")
start := time.Now()
// validating request size
@@ -202,16 +202,16 @@ func (h *Handler) resolveIP(r *Request) {
// CF-Connecting-IP is an Enterprise feature and we check it last in order.
// This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string
if r.Header.Get("X-Real-Ip") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip"))
+ r.RemoteAddr = FetchIP(r.Header.Get("X-Real-Ip"))
return
}
if r.Header.Get("True-Client-IP") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP"))
+ r.RemoteAddr = FetchIP(r.Header.Get("True-Client-IP"))
return
}
if r.Header.Get("CF-Connecting-IP") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP"))
+ r.RemoteAddr = FetchIP(r.Header.Get("CF-Connecting-IP"))
}
}
diff --git a/plugins/http/worker_handler/parse.go b/pkg/worker_handler/parse.go
index 2790da2a..2790da2a 100644
--- a/plugins/http/worker_handler/parse.go
+++ b/pkg/worker_handler/parse.go
diff --git a/plugins/http/worker_handler/request.go b/pkg/worker_handler/request.go
index 178bc827..75ee8381 100644
--- a/plugins/http/worker_handler/request.go
+++ b/pkg/worker_handler/request.go
@@ -61,7 +61,7 @@ type Request struct {
body interface{}
}
-func fetchIP(pair string) string {
+func FetchIP(pair string) string {
if !strings.ContainsRune(pair, ':') {
return pair
}
@@ -73,10 +73,10 @@ func fetchIP(pair string) string {
// NewRequest creates new PSR7 compatible request using net/http request.
func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) {
req := &Request{
- RemoteAddr: fetchIP(r.RemoteAddr),
+ RemoteAddr: FetchIP(r.RemoteAddr),
Protocol: r.Proto,
Method: r.Method,
- URI: uri(r),
+ URI: URI(r),
Header: r.Header,
Cookies: make(map[string]string),
RawQuery: r.URL.RawQuery,
@@ -174,8 +174,8 @@ func (r *Request) contentType() int {
return contentStream
}
-// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
-func uri(r *http.Request) string {
+// URI fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func URI(r *http.Request) string {
if r.URL.Host != "" {
return r.URL.String()
}
diff --git a/plugins/http/worker_handler/response.go b/pkg/worker_handler/response.go
index 1763d304..1763d304 100644
--- a/plugins/http/worker_handler/response.go
+++ b/pkg/worker_handler/response.go
diff --git a/plugins/http/worker_handler/uploads.go b/pkg/worker_handler/uploads.go
index e695000e..e695000e 100644
--- a/plugins/http/worker_handler/uploads.go
+++ b/pkg/worker_handler/uploads.go
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 4625b7a7..29fa3640 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -23,9 +23,9 @@ type Watcher interface {
// Destroy destroys the underlying container
Destroy(ctx context.Context)
- // WorkersList return all container w/o removing it from internal storage
+ // List return all container w/o removing it from internal storage
List() []worker.BaseProcess
- // RemoveWorker remove worker from the container
+ // Remove will remove worker from the container
Remove(wb worker.BaseProcess)
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 5aec4ee6..557563ac 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -11,7 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead container will not be replaced
+// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
@@ -215,7 +215,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
}
-// Warning, this is O(n) operation, and it will return copy of the actual workers
+// List - this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()
diff --git a/plugins/config/interface.go b/plugins/config/interface.go
index 59ad981f..b3854e09 100644
--- a/plugins/config/interface.go
+++ b/plugins/config/interface.go
@@ -11,7 +11,7 @@ type Configurer interface {
// }
UnmarshalKey(name string, out interface{}) error
- // Unmarshal unmarshals the config into a Struct. Make sure that the tags
+ // Unmarshal unmarshal the config into a Struct. Make sure that the tags
// on the fields of the structure are properly set.
Unmarshal(out interface{}) error
@@ -24,6 +24,6 @@ type Configurer interface {
// Has checks if config section exists.
Has(name string) bool
- // Returns General section. Read-only
+ // GetCommonConfig returns General section. Read-only
GetCommonConfig() *General
}
diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go
index 8b63395f..a1c2afa6 100644
--- a/plugins/http/config/http.go
+++ b/plugins/http/config/http.go
@@ -7,7 +7,7 @@ import (
"time"
"github.com/spiral/errors"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
)
// HTTP configures RoadRunner HTTP server.
@@ -34,7 +34,7 @@ type HTTP struct {
Uploads *Uploads `mapstructure:"uploads"`
// Pool configures worker pool.
- Pool *poolImpl.Config `mapstructure:"pool"`
+ Pool *pool.Config `mapstructure:"pool"`
// Env is environment variables passed to the http pool
Env map[string]string
@@ -70,7 +70,7 @@ func (c *HTTP) EnableFCGI() bool {
func (c *HTTP) InitDefaults() error {
if c.Pool == nil {
// default pool
- c.Pool = &poolImpl.Config{
+ c.Pool = &pool.Config{
Debug: false,
NumWorkers: uint64(runtime.NumCPU()),
MaxJobs: 0,
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 2b68bbe5..770ca8ca 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -7,16 +7,15 @@ import (
"net/http"
"sync"
- "github.com/hashicorp/go-multierror"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
httpConfig "github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/status"
@@ -71,47 +70,47 @@ type Plugin struct {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error {
const op = errors.Op("http_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, err)
}
- err = s.cfg.InitDefaults()
+ err = p.cfg.InitDefaults()
if err != nil {
return errors.E(op, err)
}
// rr logger (via plugin)
- s.log = rrLogger
+ p.log = rrLogger
// use time and date in UTC format
- s.stdLog = log.New(logger.NewStdAdapter(s.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC)
+ p.stdLog = log.New(logger.NewStdAdapter(p.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC)
- s.mdwr = make(map[string]Middleware)
+ p.mdwr = make(map[string]Middleware)
- if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() {
+ if !p.cfg.EnableHTTP() && !p.cfg.EnableTLS() && !p.cfg.EnableFCGI() {
return errors.E(op, errors.Disabled)
}
// init if nil
- if s.cfg.Env == nil {
- s.cfg.Env = make(map[string]string)
+ if p.cfg.Env == nil {
+ p.cfg.Env = make(map[string]string)
}
- s.cfg.Env[RrMode] = "http"
- s.server = server
+ p.cfg.Env[RrMode] = "http"
+ p.server = server
return nil
}
-func (s *Plugin) logCallback(event interface{}) {
+func (p *Plugin) logCallback(event interface{}) {
if ev, ok := event.(handler.ResponseEvent); ok {
- s.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI),
+ p.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI),
"remote", ev.Request.RemoteAddr,
"elapsed", ev.Elapsed().String(),
)
@@ -119,60 +118,66 @@ func (s *Plugin) logCallback(event interface{}) {
}
// Serve serves the svc.
-func (s *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 2)
// run whole process in the goroutine
go func() {
// protect http initialization
- s.Lock()
- s.serve(errCh)
- s.Unlock()
+ p.Lock()
+ p.serve(errCh)
+ p.Unlock()
}()
return errCh
}
-func (s *Plugin) serve(errCh chan error) {
+func (p *Plugin) serve(errCh chan error) {
var err error
const op = errors.Op("http_plugin_serve")
- s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
+ p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, p.cfg.Env, p.logCallback)
if err != nil {
errCh <- errors.E(op, err)
return
}
- s.handler, err = handler.NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.Cidrs,
- s.pool,
+ p.handler, err = handler.NewHandler(
+ p.cfg.MaxRequestSize,
+ *p.cfg.Uploads,
+ p.cfg.Cidrs,
+ p.pool,
)
if err != nil {
errCh <- errors.E(op, err)
return
}
- s.handler.AddListener(s.logCallback)
+ p.handler.AddListener(p.logCallback)
- if s.cfg.EnableHTTP() {
- if s.cfg.EnableH2C() {
- s.http = &http.Server{Handler: h2c.NewHandler(s, &http2.Server{}), ErrorLog: s.stdLog}
+ if p.cfg.EnableHTTP() {
+ if p.cfg.EnableH2C() {
+ p.http = &http.Server{
+ Handler: h2c.NewHandler(p, &http2.Server{}),
+ ErrorLog: p.stdLog,
+ }
} else {
- s.http = &http.Server{Handler: s, ErrorLog: s.stdLog}
+ p.http = &http.Server{
+ Handler: p,
+ ErrorLog: p.stdLog,
+ }
}
}
- if s.cfg.EnableTLS() {
- s.https = s.initSSL()
- if s.cfg.SSLConfig.RootCA != "" {
- err = s.appendRootCa()
+ if p.cfg.EnableTLS() {
+ p.https = p.initSSL()
+ if p.cfg.SSLConfig.RootCA != "" {
+ err = p.appendRootCa()
if err != nil {
errCh <- errors.E(op, err)
return
@@ -180,102 +185,95 @@ func (s *Plugin) serve(errCh chan error) {
}
// if HTTP2Config not nil
- if s.cfg.HTTP2Config != nil {
- if err := s.initHTTP2(); err != nil {
+ if p.cfg.HTTP2Config != nil {
+ if err := p.initHTTP2(); err != nil {
errCh <- errors.E(op, err)
return
}
}
}
- if s.cfg.EnableFCGI() {
- s.fcgi = &http.Server{Handler: s, ErrorLog: s.stdLog}
+ if p.cfg.EnableFCGI() {
+ p.fcgi = &http.Server{Handler: p, ErrorLog: p.stdLog}
}
// start http, https and fcgi servers if requested in the config
go func() {
- s.serveHTTP(errCh)
+ p.serveHTTP(errCh)
}()
go func() {
- s.serveHTTPS(errCh)
+ p.serveHTTPS(errCh)
}()
go func() {
- s.serveFCGI(errCh)
+ p.serveFCGI(errCh)
}()
}
// Stop stops the http.
-func (s *Plugin) Stop() error {
- s.Lock()
- defer s.Unlock()
+func (p *Plugin) Stop() error {
+ p.Lock()
+ defer p.Unlock()
- var err error
- if s.fcgi != nil {
- err = s.fcgi.Shutdown(context.Background())
+ if p.fcgi != nil {
+ err := p.fcgi.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the fcgi server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
+ p.log.Error("fcgi shutdown", "error", err)
}
}
- if s.https != nil {
- err = s.https.Shutdown(context.Background())
+ if p.https != nil {
+ err := p.https.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the https server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
+ p.log.Error("https shutdown", "error", err)
}
}
- if s.http != nil {
- err = s.http.Shutdown(context.Background())
+ if p.http != nil {
+ err := p.http.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the http server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
+ p.log.Error("http shutdown", "error", err)
}
}
// check for safety
- if s.pool != nil {
- s.pool.Destroy(context.Background())
+ if p.pool != nil {
+ p.pool.Destroy(context.Background())
}
- return err
+ return nil
}
// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
-func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if headerContainsUpgrade(r) {
http.Error(w, "server does not support upgrade header", http.StatusInternalServerError)
return
}
- if s.https != nil && r.TLS == nil && s.cfg.SSLConfig.Redirect {
- s.redirect(w, r)
+ if p.https != nil && r.TLS == nil && p.cfg.SSLConfig.Redirect {
+ p.redirect(w, r)
return
}
- if s.https != nil && r.TLS != nil {
+ if p.https != nil && r.TLS != nil {
w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
}
r = attributes.Init(r)
// protect the case, when user sendEvent Reset and we are replacing handler with pool
- s.RLock()
- s.handler.ServeHTTP(w, r)
- s.RUnlock()
+ p.RLock()
+ p.handler.ServeHTTP(w, r)
+ p.RUnlock()
}
// Workers returns slice with the process states for the workers
-func (s *Plugin) Workers() []process.State {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Workers() []process.State {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
ps := make([]process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
@@ -290,74 +288,75 @@ func (s *Plugin) Workers() []process.State {
}
// internal
-func (s *Plugin) workers() []worker.BaseProcess {
- return s.pool.Workers()
+func (p *Plugin) workers() []worker.BaseProcess {
+ return p.pool.Workers()
}
// Name returns endure.Named interface implementation
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Reset destroys the old pool and replaces it with new one, waiting for old pool to die
-func (s *Plugin) Reset() error {
- s.Lock()
- defer s.Unlock()
+func (p *Plugin) Reset() error {
+ p.Lock()
+ defer p.Unlock()
const op = errors.Op("http_plugin_reset")
- s.log.Info("HTTP plugin got restart request. Restarting...")
- s.pool.Destroy(context.Background())
- s.pool = nil
+ p.log.Info("HTTP plugin got restart request. Restarting...")
+ p.pool.Destroy(context.Background())
+ p.pool = nil
var err error
- s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
+ p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, p.cfg.Env, p.logCallback)
if err != nil {
return errors.E(op, err)
}
- s.log.Info("HTTP workers Pool successfully restarted")
+ p.log.Info("HTTP workers Pool successfully restarted")
- s.handler, err = handler.NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.Cidrs,
- s.pool,
+ p.handler, err = handler.NewHandler(
+ p.cfg.MaxRequestSize,
+ *p.cfg.Uploads,
+ p.cfg.Cidrs,
+ p.pool,
)
+
if err != nil {
return errors.E(op, err)
}
- s.log.Info("HTTP handler listeners successfully re-added")
- s.handler.AddListener(s.logCallback)
+ p.log.Info("HTTP handler listeners successfully re-added")
+ p.handler.AddListener(p.logCallback)
- s.log.Info("HTTP plugin successfully restarted")
+ p.log.Info("HTTP plugin successfully restarted")
return nil
}
// Collects collecting http middlewares
-func (s *Plugin) Collects() []interface{} {
+func (p *Plugin) Collects() []interface{} {
return []interface{}{
- s.AddMiddleware,
+ p.AddMiddleware,
}
}
// AddMiddleware is base requirement for the middleware (name and Middleware)
-func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
- s.mdwr[name.Name()] = m
+func (p *Plugin) AddMiddleware(name endure.Named, m Middleware) {
+ p.mdwr[name.Name()] = m
}
// Status return status of the particular plugin
-func (s *Plugin) Status() status.Status {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Status() status.Status {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
for i := 0; i < len(workers); i++ {
if workers[i].State().IsActive() {
return status.Status{
@@ -372,14 +371,14 @@ func (s *Plugin) Status() status.Status {
}
// Ready return readiness status of the particular plugin
-func (s *Plugin) Ready() status.Status {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Ready() status.Status {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
- // we assume, that plugin's worker pool is ready
+ // we assume, that plugin'p worker pool is ready
if workers[i].State().Value() == worker.StateReady {
return status.Status{
Code: http.StatusOK,
@@ -393,4 +392,4 @@ func (s *Plugin) Ready() status.Status {
}
// Available interface implementation
-func (s *Plugin) Available() {}
+func (p *Plugin) Available() {}
diff --git a/plugins/http/serve.go b/plugins/http/serve.go
index 78796322..bf1ccafe 100644
--- a/plugins/http/serve.go
+++ b/plugins/http/serve.go
@@ -17,46 +17,46 @@ import (
"golang.org/x/sys/cpu"
)
-func (s *Plugin) serveHTTP(errCh chan error) {
- if s.http == nil {
+func (p *Plugin) serveHTTP(errCh chan error) {
+ if p.http == nil {
return
}
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serveHTTP")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log)
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.http, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.Address)
+ l, err := utils.CreateListener(p.cfg.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = s.http.Serve(l)
+ err = p.http.Serve(l)
if err != nil && err != http.ErrServerClosed {
errCh <- errors.E(op, err)
return
}
}
-func (s *Plugin) serveHTTPS(errCh chan error) {
- if s.https == nil {
+func (p *Plugin) serveHTTPS(errCh chan error) {
+ if p.https == nil {
return
}
- const op = errors.Op("http_plugin_serve_https")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
+ const op = errors.Op("serveHTTPS")
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.SSLConfig.Address)
+ l, err := utils.CreateListener(p.cfg.SSLConfig.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = s.https.ServeTLS(
+ err = p.https.ServeTLS(
l,
- s.cfg.SSLConfig.Cert,
- s.cfg.SSLConfig.Key,
+ p.cfg.SSLConfig.Cert,
+ p.cfg.SSLConfig.Key,
)
if err != nil && err != http.ErrServerClosed {
@@ -66,34 +66,34 @@ func (s *Plugin) serveHTTPS(errCh chan error) {
}
// serveFCGI starts FastCGI server.
-func (s *Plugin) serveFCGI(errCh chan error) {
- if s.fcgi == nil {
+func (p *Plugin) serveFCGI(errCh chan error) {
+ if p.fcgi == nil {
return
}
- const op = errors.Op("http_plugin_serve_fcgi")
+ const op = errors.Op("serveFCGI")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.FCGIConfig.Address)
+ l, err := utils.CreateListener(p.cfg.FCGIConfig.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = fcgi.Serve(l, s.fcgi.Handler)
+ err = fcgi.Serve(l, p.fcgi.Handler)
if err != nil && err != http.ErrServerClosed {
errCh <- errors.E(op, err)
return
}
}
-func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) {
+func (p *Plugin) redirect(w http.ResponseWriter, r *http.Request) {
target := &url.URL{
Scheme: HTTPSScheme,
// host or host:port
- Host: s.tlsAddr(r.Host, false),
+ Host: p.tlsAddr(r.Host, false),
Path: r.URL.Path,
RawQuery: r.URL.RawQuery,
}
@@ -111,7 +111,7 @@ func headerContainsUpgrade(r *http.Request) bool {
}
// append RootCA to the https server TLS config
-func (s *Plugin) appendRootCa() error {
+func (p *Plugin) appendRootCa() error {
const op = errors.Op("http_plugin_append_root_ca")
rootCAs, err := x509.SystemCertPool()
if err != nil {
@@ -121,7 +121,7 @@ func (s *Plugin) appendRootCa() error {
rootCAs = x509.NewCertPool()
}
- CA, err := os.ReadFile(s.cfg.SSLConfig.RootCA)
+ CA, err := os.ReadFile(p.cfg.SSLConfig.RootCA)
if err != nil {
return err
}
@@ -137,13 +137,13 @@ func (s *Plugin) appendRootCa() error {
InsecureSkipVerify: false,
RootCAs: rootCAs,
}
- s.http.TLSConfig = cfg
+ p.http.TLSConfig = cfg
return nil
}
// Init https server
-func (s *Plugin) initSSL() *http.Server {
+func (p *Plugin) initSSL() *http.Server {
var topCipherSuites []uint16
var defaultCipherSuitesTLS13 []uint16
@@ -193,9 +193,9 @@ func (s *Plugin) initSSL() *http.Server {
DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
sslServer := &http.Server{
- Addr: s.tlsAddr(s.cfg.Address, true),
- Handler: s,
- ErrorLog: s.stdLog,
+ Addr: p.tlsAddr(p.cfg.Address, true),
+ Handler: p,
+ ErrorLog: p.stdLog,
TLSConfig: &tls.Config{
CurvePreferences: []tls.CurveID{
tls.CurveP256,
@@ -213,19 +213,19 @@ func (s *Plugin) initSSL() *http.Server {
}
// init http/2 server
-func (s *Plugin) initHTTP2() error {
- return http2.ConfigureServer(s.https, &http2.Server{
- MaxConcurrentStreams: s.cfg.HTTP2Config.MaxConcurrentStreams,
+func (p *Plugin) initHTTP2() error {
+ return http2.ConfigureServer(p.https, &http2.Server{
+ MaxConcurrentStreams: p.cfg.HTTP2Config.MaxConcurrentStreams,
})
}
// tlsAddr replaces listen or host port with port configured by SSLConfig config.
-func (s *Plugin) tlsAddr(host string, forcePort bool) string {
+func (p *Plugin) tlsAddr(host string, forcePort bool) string {
// remove current forcePort first
host = strings.Split(host, ":")[0]
- if forcePort || s.cfg.SSLConfig.Port != 443 {
- host = fmt.Sprintf("%s:%v", host, s.cfg.SSLConfig.Port)
+ if forcePort || p.cfg.SSLConfig.Port != 443 {
+ host = fmt.Sprintf("%s:%v", host, p.cfg.SSLConfig.Port)
}
return host
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 2e2df527..0f647cb1 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
)
@@ -393,7 +394,7 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
- err := b.Delete([]byte(k))
+ err := b.Delete(utils.AsBytes(k))
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 17b06fa0..02281ed5 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -148,7 +149,7 @@ func (d *Driver) Set(items ...kv.Item) error {
memcachedItem := &memcache.Item{
Key: items[i].Key,
// unsafe convert
- Value: []byte(items[i].Value),
+ Value: utils.AsBytes(items[i].Value),
Flags: 0,
}
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index 1e0d03d4..c2494ee7 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -70,7 +71,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return []byte(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(kv.Item).Value), nil
}
return nil, nil
}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
index d2183411..3694c5a7 100644
--- a/plugins/kv/drivers/redis/plugin.go
+++ b/plugins/kv/drivers/redis/plugin.go
@@ -28,7 +28,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
// Serve is noop here
func (s *Plugin) Serve() chan error {
- return make(chan error, 1)
+ return make(chan error)
}
func (s *Plugin) Stop() error {
diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go
index fe2fa10b..9a609735 100644
--- a/plugins/kv/storage.go
+++ b/plugins/kv/storage.go
@@ -90,9 +90,9 @@ func (p *Plugin) Serve() chan error {
return errCh
}
- // config key for the particular sub-driver
+ // config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
- // at this point we know, that driver field present in the cofiguration
+ // at this point we know, that driver field present in the configuration
switch v.(map[string]interface{})[driver] {
case memcached:
if _, ok := p.drivers[memcached]; !ok {
diff --git a/plugins/logger/interface.go b/plugins/logger/interface.go
index 5bb2143b..827f9821 100644
--- a/plugins/logger/interface.go
+++ b/plugins/logger/interface.go
@@ -8,7 +8,7 @@ type Logger interface {
Error(msg string, keyvals ...interface{})
}
-// With creates a child logger and adds structured context to it
+// WithLogger creates a child logger and adds structured context to it
type WithLogger interface {
With(keyvals ...interface{}) Logger
}
diff --git a/plugins/logger/std_log_adapter.go b/plugins/logger/std_log_adapter.go
index 484cc23e..479aa565 100644
--- a/plugins/logger/std_log_adapter.go
+++ b/plugins/logger/std_log_adapter.go
@@ -1,7 +1,7 @@
package logger
import (
- "unsafe"
+ "github.com/spiral/roadrunner/v2/utils"
)
// StdLogAdapter can be passed to the http.Server or any place which required standard logger to redirect output
@@ -12,7 +12,7 @@ type StdLogAdapter struct {
// Write io.Writer interface implementation
func (s *StdLogAdapter) Write(p []byte) (n int, err error) {
- s.log.Error("server internal error", "message", toString(p))
+ s.log.Error("server internal error", "message", utils.AsString(p))
return len(p), nil
}
@@ -24,8 +24,3 @@ func NewStdAdapter(log Logger) *StdLogAdapter {
return logAdapter
}
-
-// unsafe, but lightning fast []byte to string conversion
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))
-}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
new file mode 100644
index 00000000..49c187bc
--- /dev/null
+++ b/plugins/memory/plugin.go
@@ -0,0 +1,81 @@
+package memory
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "memory"
+)
+
+type Plugin struct {
+ log logger.Logger
+
+ // channel with the messages from the RPC
+ pushCh chan *pubsub.Message
+ // user-subscribed topics
+ topics sync.Map
+}
+
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ p.pushCh = make(chan *pubsub.Message, 100)
+ return nil
+}
+
+// Available interface implementation for the plugin
+func (p *Plugin) Available() {}
+
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Publish(messages []*pubsub.Message) error {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(messages []*pubsub.Message) {
+ go func() {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ for i := 0; i < len(topics); i++ {
+ p.topics.Store(topics[i], struct{}{})
+ }
+ return nil
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ for i := 0; i < len(topics); i++ {
+ p.topics.Delete(topics[i])
+ }
+ return nil
+}
+
+func (p *Plugin) Next() (*pubsub.Message, error) {
+ msg := <-p.pushCh
+
+ if msg == nil {
+ return nil, nil
+ }
+
+ // push only messages, which are subscribed
+ // TODO better???
+ for i := 0; i < len(msg.Topics); i++ {
+ if _, ok := p.topics.Load(msg.Topics[i]); ok {
+ return msg, nil
+ }
+ }
+ return nil, nil
+}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
new file mode 100644
index 00000000..93b13124
--- /dev/null
+++ b/plugins/redis/fanin.go
@@ -0,0 +1,100 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type FanIn struct {
+ sync.Mutex
+
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan *pubsub.Message
+
+ exit chan struct{}
+}
+
+func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
+ out := make(chan *pubsub.Message, 100)
+ fi := &FanIn{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (fi *FanIn) AddChannel(topics ...string) error {
+ const op = errors.Op("fanin_addchannel")
+ err := fi.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (fi *FanIn) read() {
+ for {
+ select {
+ // here we receive message from us (which we sent before in Publish)
+ // it should be compatible with the websockets.Msg interface
+ // payload should be in the redis.message.payload field
+
+ case msg, ok := <-fi.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+ m := &pubsub.Message{}
+ err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("failed to unmarshal payload", "error", err.Error())
+ continue
+ }
+
+ fi.out <- m
+ case <-fi.exit:
+ return
+ }
+ }
+}
+
+func (fi *FanIn) RemoveChannel(topics ...string) error {
+ const op = errors.Op("fanin_remove")
+ err := fi.pubsub.Unsubscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (fi *FanIn) Stop() error {
+ fi.exit <- struct{}{}
+ close(fi.out)
+ close(fi.exit)
+ return nil
+}
+
+func (fi *FanIn) Consume() <-chan *pubsub.Message {
+ return fi.out
+}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 2eab7043..c1480de8 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -1,8 +1,12 @@
package redis
import (
+ "context"
+ "sync"
+
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -10,73 +14,133 @@ import (
const PluginName = "redis"
type Plugin struct {
+ sync.Mutex
// config for RR integration
cfg *Config
// logger
log logger.Logger
// redis universal client
universalClient redis.UniversalClient
+
+ fanin *FanIn
}
-func (s *Plugin) GetClient() redis.UniversalClient {
- return s.universalClient
+func (p *Plugin) GetClient() redis.UniversalClient {
+ return p.universalClient
}
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("redis_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- s.cfg.InitDefaults()
- s.log = log
-
- s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: s.cfg.Addrs,
- DB: s.cfg.DB,
- Username: s.cfg.Username,
- Password: s.cfg.Password,
- SentinelPassword: s.cfg.SentinelPassword,
- MaxRetries: s.cfg.MaxRetries,
- MinRetryBackoff: s.cfg.MaxRetryBackoff,
- MaxRetryBackoff: s.cfg.MaxRetryBackoff,
- DialTimeout: s.cfg.DialTimeout,
- ReadTimeout: s.cfg.ReadTimeout,
- WriteTimeout: s.cfg.WriteTimeout,
- PoolSize: s.cfg.PoolSize,
- MinIdleConns: s.cfg.MinIdleConns,
- MaxConnAge: s.cfg.MaxConnAge,
- PoolTimeout: s.cfg.PoolTimeout,
- IdleTimeout: s.cfg.IdleTimeout,
- IdleCheckFrequency: s.cfg.IdleCheckFreq,
- ReadOnly: s.cfg.ReadOnly,
- RouteByLatency: s.cfg.RouteByLatency,
- RouteRandomly: s.cfg.RouteRandomly,
- MasterName: s.cfg.MasterName,
+ p.cfg.InitDefaults()
+ p.log = log
+
+ p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: p.cfg.Addrs,
+ DB: p.cfg.DB,
+ Username: p.cfg.Username,
+ Password: p.cfg.Password,
+ SentinelPassword: p.cfg.SentinelPassword,
+ MaxRetries: p.cfg.MaxRetries,
+ MinRetryBackoff: p.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: p.cfg.MaxRetryBackoff,
+ DialTimeout: p.cfg.DialTimeout,
+ ReadTimeout: p.cfg.ReadTimeout,
+ WriteTimeout: p.cfg.WriteTimeout,
+ PoolSize: p.cfg.PoolSize,
+ MinIdleConns: p.cfg.MinIdleConns,
+ MaxConnAge: p.cfg.MaxConnAge,
+ PoolTimeout: p.cfg.PoolTimeout,
+ IdleTimeout: p.cfg.IdleTimeout,
+ IdleCheckFrequency: p.cfg.IdleCheckFreq,
+ ReadOnly: p.cfg.ReadOnly,
+ RouteByLatency: p.cfg.RouteByLatency,
+ RouteRandomly: p.cfg.RouteRandomly,
+ MasterName: p.cfg.MasterName,
})
+ // init fanin
+ p.fanin = NewFanIn(p.universalClient, log)
+
return nil
}
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
return errCh
}
-func (s Plugin) Stop() error {
- return s.universalClient.Close()
+func (p *Plugin) Stop() error {
+ const op = errors.Op("redis_plugin_stop")
+ err := p.fanin.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = p.universalClient.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Available interface implementation
-func (s *Plugin) Available() {
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Publish(msg []*pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
+ if f.Err() != nil {
+ return f.Err()
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
+ continue
+ }
+ }
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ return p.fanin.AddChannel(topics...)
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ return p.fanin.RemoveChannel(topics...)
+}
+
+// Next return next message
+func (p *Plugin) Next() (*pubsub.Message, error) {
+ return <-p.fanin.Consume(), nil
}
diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go
index 421668b3..c40c2fdf 100644
--- a/plugins/reload/watcher.go
+++ b/plugins/reload/watcher.go
@@ -305,7 +305,7 @@ func (w *Watcher) pollEvents(serviceName string, files map[string]os.FileInfo) {
w.mu.Lock()
defer w.mu.Unlock()
- // Store create and remove events for use to check for rename events.
+ // InsertMany create and remove events for use to check for rename events.
creates := make(map[string]os.FileInfo)
removes := make(map[string]os.FileInfo)
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index 22f02685..0424d52d 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -14,7 +14,10 @@ type Env map[string]string
// Server creates workers for the application.
type Server interface {
+ // CmdFactory return a new command based on the .rr.yaml server.command section
CmdFactory(env Env) (func() *exec.Cmd, error)
+ // NewWorker return a new worker with provided and attached by the user listeners and environment variables
NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error)
+ // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration
NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index ef77f7ab..aab9dcde 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -25,9 +25,9 @@ import (
const PluginName = "server"
// RR_RELAY env variable key (internal)
-const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck
+const RR_RELAY = "RR_RELAY" //nolint:stylecheck
// RR_RPC env variable key (internal) if the RPC presents
-const RR_RPC = "RR_RPC" //nolint:golint,stylecheck
+const RR_RPC = "RR_RPC" //nolint:stylecheck
// Plugin manages worker
type Plugin struct {
diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go
new file mode 100644
index 00000000..18c63be3
--- /dev/null
+++ b/plugins/websockets/commands/enums.go
@@ -0,0 +1,9 @@
+package commands
+
+type Command string
+
+const (
+ Leave string = "leave"
+ Join string = "join"
+ Headers string = "headers"
+)
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
new file mode 100644
index 00000000..be4aaa82
--- /dev/null
+++ b/plugins/websockets/config.go
@@ -0,0 +1,58 @@
+package websockets
+
+import (
+ "time"
+
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+)
+
+/*
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+
+ pubsubs:["redis", "amqp", "memory"]
+ # path used as websockets path
+ path: "/ws"
+*/
+
+// Config represents configuration for the ws plugin
+type Config struct {
+ // http path for the websocket
+ Path string `mapstructure:"path"`
+ // ["redis", "amqp", "memory"]
+ PubSubs []string `mapstructure:"pubsubs"`
+ Middleware []string `mapstructure:"middleware"`
+
+ Pool *pool.Config `mapstructure:"pool"`
+}
+
+// InitDefault initialize default values for the ws config
+func (c *Config) InitDefault() {
+ if c.Path == "" {
+ c.Path = "/ws"
+ }
+ if len(c.PubSubs) == 0 {
+ // memory used by default
+ c.PubSubs = append(c.PubSubs, "memory")
+ }
+
+ if c.Pool == nil {
+ c.Pool = &pool.Config{}
+ if c.Pool.NumWorkers == 0 {
+ // 2 workers by default
+ c.Pool.NumWorkers = 2
+ }
+
+ if c.Pool.AllocateTimeout == 0 {
+ c.Pool.AllocateTimeout = time.Minute
+ }
+
+ if c.Pool.DestroyTimeout == 0 {
+ c.Pool.DestroyTimeout = time.Minute
+ }
+ if c.Pool.Supervisor == nil {
+ return
+ }
+ c.Pool.Supervisor.InitDefaults()
+ }
+}
diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go
new file mode 100644
index 00000000..2b847173
--- /dev/null
+++ b/plugins/websockets/connection/connection.go
@@ -0,0 +1,67 @@
+package connection
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Connection represents wrapped and safe to use from the different threads websocket connection
+type Connection struct {
+ sync.RWMutex
+ log logger.Logger
+ conn *websocket.Conn
+}
+
+func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection {
+ return &Connection{
+ conn: wsConn,
+ log: log,
+ }
+}
+
+func (c *Connection) Write(mt int, data []byte) error {
+ c.Lock()
+ defer c.Unlock()
+
+ const op = errors.Op("websocket_write")
+ // handle a case when a goroutine tried to write into the closed connection
+ defer func() {
+ if r := recover(); r != nil {
+ c.log.Warn("panic handled, tried to write into the closed connection")
+ }
+ }()
+
+ err := c.conn.WriteMessage(mt, data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *Connection) Read() (int, []byte, error) {
+ const op = errors.Op("websocket_read")
+
+ mt, data, err := c.conn.ReadMessage()
+ if err != nil {
+ return -1, nil, errors.E(op, err)
+ }
+
+ return mt, data, nil
+}
+
+func (c *Connection) Close() error {
+ c.Lock()
+ defer c.Unlock()
+ const op = errors.Op("websocket_close")
+
+ err := c.conn.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio
new file mode 100644
index 00000000..230870f2
--- /dev/null
+++ b/plugins/websockets/doc/broadcast.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-27T20:56:56.848Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="Pt0MY_-SPz7R7foQA1VL" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">7V1Zc9rIFv411HUeULV28WgwTjxjx06wJ8l9mRKoAU2ExEjCS3797W4t9IbAoMZLrqcqg1oLrT77OV8fOuZg8fgx9ZfzqySAUccAwWPHPOsYhmECC/0PjzwVI7ppesXILA2Dcmw9MAp/wXIQlKOrMIAZc2GeJFEeLtnBSRLHcJIzY36aJg/sZdMkYr916c+gMDCa+JE4+i0M8nkxagIA1ic+wXA2z/kzC7+6uhzI5n6QPFBD5rBjDtIkyYtPi8cBjPDyVQtT3He+4Ww9sxTG+S43PFlXl7c/+n48uTWjn1/SEMSn3V45t/ypemMYoAUoD5M0nyezJPaj4Xq0vx69TJIlukxHg//APH8q6eev8gQNzfNFVJ6Fj2H+nfr8A30GmmGXh2eYYUB18FQdxHn69J0+oG/Dx+v7yFF1o7gy5WJlySqdwIblqFjMT2cwb7qupCleLOobyoX/CJMFRBNCF6Qw8vPwnuUmv2TKWX1dfetNEqI5G6CUIN2y7eKeSoB0y2AfUsy1vG9NfvSBmsh6iDCFnEGuvFUc/53d9od/zbI/xr9On7y7rm7swSFpsooDGJTkOIBfKB75QZ3axi8Ui/xgOETOLxlaw/wUaws0ECcxrMbOQ7xYFE9x9FbLZNaBLMWwwrPp7hQPvvejVflVlpQTLv0xUvoM9fwonMXo8wQtDUzRwD1M8xAp1dPyxCIMgoJRYBb+8sfkeXiVl5iNyavY/Y59Vq87fgB8ZN641PjlzWs1S1OkgaPFdS0f3wUacD2PkbqKZPtKc/WYHntHMp1mUInUms7LSq1LSy3YUWpZLe9u0/IvL7XSpTeclxTbyjpQYvsV+gEa+Tz8fnvyATsmMMuw14O8oTRZ4Fea44PlapytxnhOMb48g3GQ4W/LictVX/aQpD9his8skR/2PjSCuYFk5eOBpjuGyyoE4zCFUOka1+H0jHssDWHrAqPIdcZ7oybS77Zlm+y6m62Qk+UR3VNAyybVRJHy27A/uh78ObwdofFPd2iVwclo+PWvIfpwfvd58EEg9cM8zOFo6RMd+ICiOZbkU6RTB0mUpORqM/ChN50QhZsmPyF1xpl4cDzFdyRxTo0D8tfknAs8sJGEumFxXrFhlwL5sA7SdKvkmDkVnzlAlb/0igzuzm7ys7zkFm2psaMt1e2XtKWiBzzy7+FaNYVJXJvLB58Yyili98pSjtPEDyZ+lq9tbvY+NGyzdALNNAxWPtsyl6brauyTLbsaOILJNAWG8H4HgiKLaRkeR9EDI6Ly0Y6hWazVdLRqmdUT1Ph/buOlNPvB+bKDNLttCZIsT4S+OUk2mpMbiHt6wFChm3X2qci3UJHsaOI52v3Fbu8VocMDWjGBtM9zdqc2/k/q7JI/0dkt/tpydh1W9TpeyQy0rwskvq6lzNcFwooeUW/uWUOgEkq1ut1Vb04iP8vCCac69XZVpyWqzs3R3qGVBvRm/hN1QambNos3qLiuruSVeuR89ztc0HyHaTffgT4U8243V/rCXsDuTsAutnxvR+GtcrKYhelxXGT1elqP+uMChg0FtOdKSM/hmbfXzO7CPJ99g+5tESiDF0HuDjUCVUkxZZDt9+Fdmc3eFQqUXNdhS0ftZBaRyXW0qgBXh2A69xx1LpboL38dnl2M8PcY4OJz92p4df31x6F+1hQ6E2lSMXB7Y5I8VJhU7AFOVoyeJKdoKPKzTOvvs+ki/1N/7D1G/t/9weWPv8r0F7Ps5h626hDrxPha7o6+ls7Yne2VvP0tTeWJ7mZqBPPWVPKxqpCw4gf7MEk+QhlA1Ltfb9CjQP9udKBoQj2woSsTzZ7jmr4kBGo13+/slO+XyWYb+f7B2Y9o8V83eFwtT/8MgssvZ3nW3afAvo8g7pmsf369ZichlC7FrngWY0cZbD0HJJ21JwjLoMQQMvXuj9d4ynGwLMX4hCmaI09iGs7EatrRvBuaXk18urMX1OTcOI7L6sS28vo9U+PTy0dLAleahGIE9+Xouae3upn0DSbO0S2dXXWrFXLqntbj4BK6oTmGAoo26SOKoAI9/WxZoIWn4SOO4mmiiqQRyBwuCGyY2LxSdevGevwsXMzQzKNwjP71f61SiF9yBmOY+mj2532MS4aplt3PWjKQllAYs3UxS+jVF9Em0lXmvu6TWGktlbIr6kx7Vlpwfytp7mgl24cWH0REQ3Qq5WQ9urlr5LnD7R3QDNNWUvTk4gqkGlWUSpqYkCLmzV3/8mL0SSDpM+MDc+IBIPMv+6Bny0L3KflrKz5wdI2DHCDbtqsCNJUpQNES7QMTeuUqcfcK8z5q0xbVZhNn7x7gG5xog31k+9llFBfwbFr6RRtzuNvuYHO4ivINTaShlcmnmwMViQ9swwikgeqZfq5ekfCJBssVtYg00dCGEpGuck9Y5S46vFvOUj8QwGgkcH1ADOhEaF79cYo+zfAnfE+JXitx3HCcJZOfmDOYR0hvxNJMrlssCrhbhBURFhY/COMZfu3iRWJ0STGwRryxMHPZ5ASeQeTK5a5EBKf5YY4EVpZZ6bO3xDWmy1Xoq2OKaUxHwjS2MssjMA2BRAyiEJZVrmeIaBtLZLlsWcSWgBg894i2WRSrii+XzOI4/67wBs3+AhmgELHaKToLlo/oX7IyoBjv5tgw43MWdQ6zcbfkXHyu9IOZ0wGcJKlPZI9cg8vKaRQiK1p/9VpMSsGpBq7qnRxIW64m+QpDXYpr0JqM+fvQ2JIfm6f8yF6vX4glPunhk/zMb5NlOCH3nBWTJTpi86xUzKGP7UkqTAKcpDAIkVIaEKW1SNKnD6KGuvGfooRsoynuTv0H9O/4KYdSdXbcNxvUavnkH6TuileJINH2AyJPyFKk2Qf5vDhdwGtWwti8+k3QVdOIOHxT4uQJmRB8fO4vwgjL+ycY3UP81Pb1isd7R6YHtCp5SKtfmc32epprK9IvkmKM6PrHAec3SzP/Haaatq2YJodJNVfu9nfPJVmNJnW71T2nSGY3GMxDM5Suw/t5vCku3lxAhUiSnTr/KMA9SvEO7X1KworAePvt53/WRs+2oXhyFK3I1ZsrWkfIzMm3egoKZhgHxAVAUkRunyRRVBa05tjEgQUFiS03rsDHZZLhwyJ+ID76PM+xnVpGK2yVXizZ1ypyp9GMdIGGTQQjx23VtnTd1mzu0dXxEbY4CEzy6fYWBeng5vLu48VngbisYtgSGAgk56vONvQCSxbMe8bYJMDpjZQWxHazDwAqW1uHXzvG7Mo2A7rCql8slhFcQJIcAjer8YhstQ7xuk3x8u4eAbchX9IY+CAiiGBGS0IEV0IEQxURJNlXtPBRmM07TIV/XAYGb0zNbRMKE2gAWJZnuLbnGIDbm6yO9y051vNV+yPPw9ao21Il52MJnFpN54n9UtoOJ/qeswWVLNywBfeMXOxe0w1qUMyWuP13UOdJL87eyVZea8MO67VjZFoWB/ppCSUCND5teiynyBbt8waswjukJ+C38raEUDcNjQuqu0drZqKLsdAn5GF0l8i7StIFCtqwoe+PbgmFk7RImj6E+bz2AnKcoszqsAgFmckk9HMYdOj6iEzuW3WZp94EyjHwY8/GSYp2XGabQzZ45ku7zJYcdPfKG2i4r9pxkNTOX2PLKhGJ+5GUJf1JvsJ4OfCQdZtKlOj//gLLWTzOSLK66MiRIkEkK1el7pG+Kr3+xTtR9u42ZW/x+7LbQTDZumZ5ki11VVMOXVPR+0huy8VCXmUNfgPymlzyua3N/JZmHY2Ckl1Ot3OMLOhm/pSkI/1s3l34S95k/4RP2GB3RQtdGnHyxOoSRokotuIO8Hu6dLuMceY6O6BYDrPurskGWK4n4hHqDsttQ4HlNBY97nUupm5I+MZzYLrt8sVI2TYl2cKrS4KJxrXYEzZeETnJc38yJ04u3eORtHeUZ//fHFEsfl+nqYtEMWT9M9QRRbRZBVEmBYyKqKsKn5VptaS8eVK4HCks3daALoqId0xqSBDulY8ZhPc8gCNb+nE1RjBVlZwgN/snMTZsVplFv617r57UeJRpCKOABmfQX0ENs5OReLwYyJJW9T4/Z76Okm5K5jekvo8OvzuIrTyhLUJPDGZNU8JSbYDL5NXGV1STV9Vk/8jBbHXh9p6RL9o23ZRkopB0Rljw/rjGBVes5y+Hp6QVawmolWDIRnf90eDrRR9ddX73mTp6D4GNubUajzxotkPvgZvvyycbKK5htzzyKkBhoCqGORv6aL03egJNdw122VtquNw1WC7Rdc1V0ROlUSPRAJzvw8Hd7fVXUZ7LJuwUKv6kEn7ck73A06sMSfWxr0NDFpIC4AxPzxWHpHzHZk9io6Vun7KEsyHW99RSwEdBhiFvI3he9spQmhSwPb7gptc/VvRiaX9DTAy8byo4EipIdioemQpiluB9U8GVUMF4cSrI9qS8Zyp4Eiq8eCHSfOFC5H5NoRXt2G0ldKtI+spDN0vcHF/UIetWP0yJge0DtMpyclhgDMZPHQZXIOWot+fmb61H2U6V8D7YsXc1dsOgfrxATXQMKyjpb0BBg2vx1E7BuKv3+Mi7q6R7pfy15T/T8KqxJVwPxVeWjdtdpR/6Cy77YVI9j+PiyofYhDE1gJBFfu4dHnuDIlSqmF8sjBSHkWHNE1VBp0zTMk3uw2AHhNvb1HXeNl3nuNxvj7Rju1yDZSTjaIZLjOJ0eSHiHRLTtjwFtOxSP2RR2a2jAWMqJfvbxIM9STwo20Z03HhQTPPyv4EJTj5eiz1A31x53uWQEj3ZPrqjolcsMSNyCWcwDkg3hE19F0ihXNJlgW/HQYrZG7ts0FV4XeuQXXs8YIm1sqT7EfCXy+fW8hXN2sCzptA+r2JSJp7UunFJhVngIUg1fAI36gynIT7PYh7C9X7lRTbrUEDfV/GeFssyslejO4+EcZdrPnLE+dt4rt9qwFGHArVgHOYa1LKJ8av9rJ/R15W/MYzCpYD1MSnPs/QDyHCFg1H3eg7LcstVNicVuDUevFP9rvF/KKX+ZQVXNXRmXDrME/QYGKicrYtnWzVqqh1z0qqGmi6BHq3bUqubjkemQ0cWqyVpPcYJ7EPJyKs4/Jcs293dxRmxCRn6hvXV+Gcy8XUFZIn8aGZNEOpxVMRykq/RwB9UvmoPv+rwEU6QaU1L3u8YjZ3MJK3TVMxMB3hq59SPi27EkjBzIhDPlIC+AkKx8xQpmfuaGufMC/VHt91qG5TCd8HGlFXlWIVcX18Wd90U0jfjOtMVSVd59Frv3eJfR+l7FOYV5qs0Ltmd4+FCjyicAbGlTXtlhDXJBWA9kTGFcyR28FvDXhyeYeVzeeO+NY98le23s3sy15rPe+/gW2MxTzAV1sEoCgXnV0mAg/7h/wA=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/doc.go b/plugins/websockets/doc/doc.go
new file mode 100644
index 00000000..fc214be8
--- /dev/null
+++ b/plugins/websockets/doc/doc.go
@@ -0,0 +1,27 @@
+package doc
+
+/*
+RPC message structure:
+
+type Msg struct {
+ // Topic message been pushed into.
+ Topics_ []string `json:"topic"`
+
+ // Command (join, leave, headers)
+ Command_ string `json:"command"`
+
+ // Broker (redis, memory)
+ Broker_ string `json:"broker"`
+
+ // Payload to be broadcasted
+ Payload_ []byte `json:"payload"`
+}
+
+1. Topics - string array (slice) with topics to join or leave
+2. Command - string, command to apply on the provided topics
+3. Broker - string, pub-sub broker to use, for the one-node systems might be used `memory` broker or `redis`. For the multi-node -
+`redis` broker should be used.
+4. Payload - raw byte array to send to the subscribers (binary messages).
+
+
+*/
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
new file mode 100644
index 00000000..24ea19ce
--- /dev/null
+++ b/plugins/websockets/executor/executor.go
@@ -0,0 +1,226 @@
+package executor
+
+import (
+ "fmt"
+ "net/http"
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/commands"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+)
+
+type Response struct {
+ Topic string `json:"topic"`
+ Payload []string `json:"payload"`
+}
+
+type Executor struct {
+ sync.Mutex
+ conn *connection.Connection
+ storage *storage.Storage
+ log logger.Logger
+
+ // associated connection ID
+ connID string
+
+ // map with the pubsub drivers
+ pubsub map[string]pubsub.PubSub
+ actualTopics map[string]struct{}
+
+ req *http.Request
+ accessValidator validator.AccessValidatorFn
+}
+
+// NewExecutor creates protected connection and starts command loop
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage,
+ connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ return &Executor{
+ conn: conn,
+ connID: connID,
+ storage: bst,
+ log: log,
+ pubsub: pubsubs,
+ accessValidator: av,
+ actualTopics: make(map[string]struct{}, 10),
+ req: r,
+ }
+}
+
+func (e *Executor) StartCommandLoop() error { //nolint:gocognit
+ const op = errors.Op("executor_command_loop")
+ for {
+ mt, data, err := e.conn.Read()
+ if err != nil {
+ if mt == -1 {
+ e.log.Info("socket was closed", "reason", err, "message type", mt)
+ return nil
+ }
+
+ return errors.E(op, err)
+ }
+
+ msg := &pubsub.Message{}
+
+ err = json.Unmarshal(data, msg)
+ if err != nil {
+ e.log.Error("error unmarshal message", "error", err)
+ continue
+ }
+
+ // nil message, continue
+ if msg == nil {
+ e.log.Warn("get nil message, skipping")
+ continue
+ }
+
+ switch msg.Command {
+ // handle leave
+ case commands.Join:
+ e.log.Debug("get join command", "msg", msg)
+
+ val, err := e.accessValidator(e.req, msg.Topics...)
+ if err != nil {
+ if val != nil {
+ e.log.Debug("validation error", "status", val.Status, "headers", val.Header, "body", val.Body)
+ }
+
+ resp := &Response{
+ Topic: "#join",
+ Payload: msg.Topics,
+ }
+
+ packet, errJ := json.Marshal(resp)
+ if errJ != nil {
+ e.log.Error("error marshal the body", "error", errJ)
+ return errors.E(op, fmt.Errorf("%v,%v", err, errJ))
+ }
+
+ errW := e.conn.Write(websocket.BinaryMessage, packet)
+ if errW != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", errW)
+ return errors.E(op, fmt.Errorf("%v,%v", err, errW))
+ }
+
+ continue
+ }
+
+ resp := &Response{
+ Topic: "@join",
+ Payload: msg.Topics,
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ return errors.E(op, err)
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ return errors.E(op, err)
+ }
+
+ // subscribe to the topic
+ if br, ok := e.pubsub[msg.Broker]; ok {
+ err = e.Set(br, msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ // handle leave
+ case commands.Leave:
+ e.log.Debug("get leave command", "msg", msg)
+
+ // prepare response
+ resp := &Response{
+ Topic: "@leave",
+ Payload: msg.Topics,
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ return errors.E(op, err)
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ return errors.E(op, err)
+ }
+
+ if br, ok := e.pubsub[msg.Broker]; ok {
+ err = e.Leave(br, msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ case commands.Headers:
+
+ default:
+ e.log.Warn("unknown command", "command", msg.Command)
+ }
+ }
+}
+
+func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
+ // associate connection with topics
+ err := br.Subscribe(topics...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
+ // in case of error, unsubscribe connection from the dead topics
+ _ = br.Unsubscribe(topics...)
+ return err
+ }
+
+ e.storage.InsertMany(e.connID, topics)
+
+ // save topics for the connection
+ for i := 0; i < len(topics); i++ {
+ e.actualTopics[topics[i]] = struct{}{}
+ }
+
+ return nil
+}
+
+func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
+ // remove associated connections from the storage
+ e.storage.RemoveMany(e.connID, topics)
+ err := br.Unsubscribe(topics...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
+ return err
+ }
+
+ // remove topics for the connection
+ for i := 0; i < len(topics); i++ {
+ delete(e.actualTopics, topics[i])
+ }
+
+ return nil
+}
+
+func (e *Executor) CleanUp() {
+ for topic := range e.actualTopics {
+ // remove from the bst
+ e.storage.Remove(e.connID, topic)
+
+ for _, ps := range e.pubsub {
+ _ = ps.Unsubscribe(topic)
+ }
+ }
+
+ for k := range e.actualTopics {
+ delete(e.actualTopics, k)
+ }
+}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
new file mode 100644
index 00000000..9b21ff8f
--- /dev/null
+++ b/plugins/websockets/plugin.go
@@ -0,0 +1,386 @@
+package websockets
+
+import (
+ "context"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/fasthttp/websocket"
+ "github.com/google/uuid"
+ json "github.com/json-iterator/go"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/pool"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+)
+
+const (
+ PluginName string = "websockets"
+)
+
+type Plugin struct {
+ sync.RWMutex
+ // Collection with all available pubsubs
+ pubsubs map[string]pubsub.PubSub
+
+ cfg *Config
+ log logger.Logger
+
+ // global connections map
+ connections sync.Map
+ storage *storage.Storage
+
+ // GO workers pool
+ workersPool *pool.WorkersPool
+
+ wsUpgrade *websocket.Upgrader
+ serveExit chan struct{}
+
+ phpPool phpPool.Pool
+ server server.Server
+
+ // function used to validate access to the requested resource
+ accessValidator validator.AccessValidatorFn
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+ const op = errors.Op("websockets_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.cfg.InitDefault()
+
+ p.pubsubs = make(map[string]pubsub.PubSub)
+ p.log = log
+ p.storage = storage.NewStorage()
+ p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
+ p.wsUpgrade = &websocket.Upgrader{
+ HandshakeTimeout: time.Second * 60,
+ }
+ p.serveExit = make(chan struct{})
+ p.server = server
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
+
+ go func() {
+ var err error
+ p.Lock()
+ defer p.Unlock()
+
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, map[string]string{"RR_MODE": "http"})
+ if err != nil {
+ errCh <- err
+ }
+
+ p.accessValidator = p.defaultAccessValidator(p.phpPool)
+ }()
+
+ // run all pubsubs drivers
+ for _, v := range p.pubsubs {
+ go func(ps pubsub.PubSub) {
+ for {
+ select {
+ case <-p.serveExit:
+ return
+ default:
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
+ p.workersPool.Queue(data)
+ }
+ }
+ }(v)
+ }
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ // close workers pool
+ p.workersPool.Stop()
+ p.Lock()
+ p.phpPool.Destroy(context.Background())
+ p.Unlock()
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.GetPublishers,
+ }
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// GetPublishers collects all pubsubs
+func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) {
+ p.pubsubs[name.Name()] = pub
+}
+
+func (p *Plugin) Middleware(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != p.cfg.Path {
+ next.ServeHTTP(w, r)
+ return
+ }
+
+ // we need to lock here, because accessValidator might not be set in the Serve func at the moment
+ p.RLock()
+ // before we hijacked connection, we still can write to the response headers
+ val, err := p.accessValidator(r)
+ p.RUnlock()
+ if err != nil {
+ p.log.Error("validation error")
+ w.WriteHeader(400)
+ return
+ }
+
+ if val.Status != http.StatusOK {
+ for k, v := range val.Header {
+ for i := 0; i < len(v); i++ {
+ w.Header().Add(k, v[i])
+ }
+ }
+ w.WriteHeader(val.Status)
+ _, _ = w.Write(val.Body)
+ return
+ }
+
+ // upgrade connection to websocket connection
+ _conn, err := p.wsUpgrade.Upgrade(w, r, nil)
+ if err != nil {
+ // connection hijacked, do not use response.writer or request
+ p.log.Error("upgrade connection", "error", err)
+ return
+ }
+
+ // construct safe connection protected by mutexes
+ safeConn := connection.NewConnection(_conn, p.log)
+ // generate UUID from the connection
+ connectionID := uuid.NewString()
+ // store connection
+ p.connections.Store(connectionID, safeConn)
+
+ defer func() {
+ // close the connection on exit
+ err = safeConn.Close()
+ if err != nil {
+ p.log.Error("connection close", "error", err)
+ }
+
+ // when exiting - delete the connection
+ p.connections.Delete(connectionID)
+ }()
+
+ // Executor wraps a connection to have a safe abstraction
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.accessValidator, r)
+ p.log.Info("websocket client connected", "uuid", connectionID)
+ defer e.CleanUp()
+
+ err = e.StartCommandLoop()
+ if err != nil {
+ p.log.Error("command loop error, disconnecting", "error", err.Error())
+ return
+ }
+
+ p.log.Info("disconnected", "connectionID", connectionID)
+ })
+}
+
+// Workers returns slice with the process states for the workers
+func (p *Plugin) Workers() []process.State {
+ p.RLock()
+ defer p.RUnlock()
+
+ workers := p.workers()
+
+ ps := make([]process.State, 0, len(workers))
+ for i := 0; i < len(workers); i++ {
+ state, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ return nil
+ }
+ ps = append(ps, state)
+ }
+
+ return ps
+}
+
+// internal
+func (p *Plugin) workers() []worker.BaseProcess {
+ return p.phpPool.Workers()
+}
+
+// Reset destroys the old pool and replaces it with new one, waiting for old pool to die
+func (p *Plugin) Reset() error {
+ p.Lock()
+ defer p.Unlock()
+ const op = errors.Op("ws_plugin_reset")
+ p.log.Info("WS plugin got restart request. Restarting...")
+ p.phpPool.Destroy(context.Background())
+ p.phpPool = nil
+
+ var err error
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, map[string]string{"RR_MODE": "http"})
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // attach validators
+ p.accessValidator = p.defaultAccessValidator(p.phpPool)
+
+ p.log.Info("WS plugin successfully restarted")
+ return nil
+}
+
+// Publish is an entry point to the websocket PUBSUB
+func (p *Plugin) Publish(msg []*pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics); j++ {
+ if br, ok := p.pubsubs[msg[i].Broker]; ok {
+ err := br.Publish(msg)
+ if err != nil {
+ return errors.E(err)
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker)
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics); j++ {
+ err := p.pubsubs[msg[i].Broker].Publish(msg)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+ }
+ }
+ }()
+}
+
+func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
+ return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
+ p.RLock()
+ defer p.RUnlock()
+ const op = errors.Op("access_validator")
+
+ p.log.Debug("validation", "topics", topics)
+ r = attributes.Init(r)
+
+ // if channels len is eq to 0, we use serverValidator
+ if len(topics) == 0 {
+ ctx, err := validator.ServerAccessValidator(r)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ val, err := exec(ctx, pool)
+ if err != nil {
+ return nil, errors.E(err)
+ }
+
+ return val, nil
+ }
+
+ ctx, err := validator.TopicsAccessValidator(r, topics...)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ val, err := exec(ctx, pool)
+ if err != nil {
+ return nil, errors.E(op)
+ }
+
+ if val.Status != http.StatusOK {
+ return val, errors.E(op, errors.Errorf("access forbidden, code: %d", val.Status))
+ }
+
+ return val, nil
+ }
+}
+
+// go:inline
+func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) {
+ const op = errors.Op("exec")
+ pd := payload.Payload{
+ Context: ctx,
+ }
+
+ resp, err := pool.Exec(pd)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ val := &validator.AccessValidator{
+ Body: resp.Body,
+ }
+
+ err = json.Unmarshal(resp.Context, val)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return val, nil
+}
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
new file mode 100644
index 00000000..8f18580f
--- /dev/null
+++ b/plugins/websockets/pool/workers_pool.go
@@ -0,0 +1,117 @@
+package pool
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type WorkersPool struct {
+ storage *storage.Storage
+ connections *sync.Map
+ resPool sync.Pool
+ log logger.Logger
+
+ queue chan *pubsub.Message
+ exit chan struct{}
+}
+
+// NewWorkersPool constructs worker pool for the websocket connections
+func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
+ wp := &WorkersPool{
+ connections: connections,
+ queue: make(chan *pubsub.Message, 100),
+ storage: storage,
+ log: log,
+ exit: make(chan struct{}),
+ }
+
+ wp.resPool.New = func() interface{} {
+ return make(map[string]struct{}, 10)
+ }
+
+ // start 10 workers
+ for i := 0; i < 10; i++ {
+ wp.do()
+ }
+
+ return wp
+}
+
+func (wp *WorkersPool) Queue(msg *pubsub.Message) {
+ wp.queue <- msg
+}
+
+func (wp *WorkersPool) Stop() {
+ for i := 0; i < 10; i++ {
+ wp.exit <- struct{}{}
+ }
+
+ close(wp.exit)
+}
+
+func (wp *WorkersPool) put(res map[string]struct{}) {
+ // optimized
+ // https://go-review.googlesource.com/c/go/+/110055/
+ // not O(n), but O(1)
+ for k := range res {
+ delete(res, k)
+ }
+}
+
+func (wp *WorkersPool) get() map[string]struct{} {
+ return wp.resPool.Get().(map[string]struct{})
+}
+
+func (wp *WorkersPool) do() { //nolint:gocognit
+ go func() {
+ for {
+ select {
+ case msg, ok := <-wp.queue:
+ if !ok {
+ return
+ }
+ // do not handle nil's
+ if msg == nil {
+ continue
+ }
+ if len(msg.Topics) == 0 {
+ continue
+ }
+ res := wp.get()
+ // get connections for the particular topic
+ wp.storage.GetByPtr(msg.Topics, res)
+ if len(res) == 0 {
+ wp.log.Info("no such topic", "topic", msg.Topics)
+ wp.put(res)
+ continue
+ }
+
+ for i := range res {
+ c, ok := wp.connections.Load(i)
+ if !ok {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics)
+ continue
+ }
+
+ conn := c.(*connection.Connection)
+ err := conn.Write(websocket.BinaryMessage, msg.Payload)
+ if err != nil {
+ wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
+ wp.put(res)
+ continue
+ }
+ }
+
+ wp.put(res)
+ case <-wp.exit:
+ wp.log.Info("get exit signal, exiting from the workers pool")
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
new file mode 100644
index 00000000..2fb0f1b9
--- /dev/null
+++ b/plugins/websockets/rpc.go
@@ -0,0 +1,47 @@
+package websockets
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
+ const op = errors.Op("broadcast_publish")
+ r.log.Debug("message published", "msg", msg)
+
+ // just return in case of nil message
+ if msg == nil {
+ *ok = true
+ return nil
+ }
+
+ err := r.plugin.Publish(msg)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error {
+ r.log.Debug("message published", "msg", msg)
+
+ // just return in case of nil message
+ if msg == nil {
+ *ok = true
+ return nil
+ }
+ // publish to the registered broker
+ r.plugin.PublishAsync(msg)
+
+ *ok = true
+ return nil
+}
diff --git a/plugins/websockets/schema/message.fbs b/plugins/websockets/schema/message.fbs
new file mode 100644
index 00000000..f2d92c78
--- /dev/null
+++ b/plugins/websockets/schema/message.fbs
@@ -0,0 +1,10 @@
+namespace message;
+
+table Message {
+ command:string;
+ broker:string;
+ topics:[string];
+ payload:[byte];
+}
+
+root_type Message;
diff --git a/plugins/websockets/schema/message/Message.go b/plugins/websockets/schema/message/Message.go
new file mode 100644
index 00000000..26bbd12c
--- /dev/null
+++ b/plugins/websockets/schema/message/Message.go
@@ -0,0 +1,118 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package message
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Message struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Message{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &Message{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Message) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Message) Command() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Message) Broker() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Message) Topics(j int) []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4))
+ }
+ return nil
+}
+
+func (rcv *Message) TopicsLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func (rcv *Message) Payload(j int) int8 {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1))
+ }
+ return 0
+}
+
+func (rcv *Message) PayloadLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func (rcv *Message) MutatePayload(j int, n int8) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n)
+ }
+ return false
+}
+
+func MessageStart(builder *flatbuffers.Builder) {
+ builder.StartObject(4)
+}
+func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0)
+}
+func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0)
+}
+func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0)
+}
+func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0)
+}
+func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(1, numElems, 1)
+}
+func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
new file mode 100644
index 00000000..ac256be2
--- /dev/null
+++ b/plugins/websockets/storage/storage.go
@@ -0,0 +1,79 @@
+package storage
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/bst"
+)
+
+type Storage struct {
+ sync.RWMutex
+ BST bst.Storage
+}
+
+func NewStorage() *Storage {
+ return &Storage{
+ BST: bst.NewBST(),
+ }
+}
+
+func (s *Storage) InsertMany(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Insert(connID, topics[i])
+ }
+}
+
+func (s *Storage) Insert(connID string, topic string) {
+ s.Lock()
+ defer s.Unlock()
+
+ s.BST.Insert(connID, topic)
+}
+
+func (s *Storage) RemoveMany(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Remove(connID, topics[i])
+ }
+}
+
+func (s *Storage) Remove(connID string, topic string) {
+ s.Lock()
+ defer s.Unlock()
+
+ s.BST.Remove(connID, topic)
+}
+
+// GetByPtrTS Thread safe get
+func (s *Storage) GetByPtrTS(topics []string, res map[string]struct{}) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ d := s.BST.Get(topics[i])
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+ }
+}
+
+func (s *Storage) GetByPtr(topics []string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ for i := 0; i < len(topics); i++ {
+ d := s.BST.Get(topics[i])
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+ }
+}
diff --git a/plugins/websockets/storage/storage_test.go b/plugins/websockets/storage/storage_test.go
new file mode 100644
index 00000000..4072992a
--- /dev/null
+++ b/plugins/websockets/storage/storage_test.go
@@ -0,0 +1,299 @@
+package storage
+
+import (
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+const predifined = "chat-1-2"
+
+func TestNewBST(t *testing.T) {
+ // create a new bst
+ g := NewStorage()
+
+ for i := 0; i < 100; i++ {
+ g.InsertMany(uuid.NewString(), []string{"comments"})
+ }
+
+ for i := 0; i < 100; i++ {
+ g.InsertMany(uuid.NewString(), []string{"comments2"})
+ }
+
+ for i := 0; i < 100; i++ {
+ g.InsertMany(uuid.NewString(), []string{"comments3"})
+ }
+
+ res := make(map[string]struct{}, 100)
+ assert.Len(t, res, 0)
+
+ // should be 100
+ g.GetByPtr([]string{"comments"}, res)
+ assert.Len(t, res, 100)
+
+ res = make(map[string]struct{}, 100)
+ assert.Len(t, res, 0)
+
+ // should be 100
+ g.GetByPtr([]string{"comments2"}, res)
+ assert.Len(t, res, 100)
+
+ res = make(map[string]struct{}, 100)
+ assert.Len(t, res, 0)
+
+ // should be 100
+ g.GetByPtr([]string{"comments3"}, res)
+ assert.Len(t, res, 100)
+}
+
+func BenchmarkGraph(b *testing.B) {
+ g := NewStorage()
+
+ for i := 0; i < 1000; i++ {
+ uid := uuid.New().String()
+ g.InsertMany(uuid.NewString(), []string{uid})
+ }
+
+ g.Insert(uuid.NewString(), predifined)
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ res := make(map[string]struct{})
+
+ for i := 0; i < b.N; i++ {
+ g.GetByPtr([]string{predifined}, res)
+
+ for i := range res {
+ delete(res, i)
+ }
+ }
+}
+
+func BenchmarkBigSearch(b *testing.B) {
+ g1 := NewStorage()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ b.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ res := make(map[string]struct{}, 333)
+
+ for i := 0; i < b.N; i++ {
+ g1.GetByPtr(predefinedSlice, res)
+
+ for i := range res {
+ delete(res, i)
+ }
+ }
+}
+
+func BenchmarkBigSearchWithRemoves(b *testing.B) {
+ g1 := NewStorage()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ b.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ go func() {
+ tt := time.NewTicker(time.Microsecond)
+
+ res := make(map[string]struct{}, 1000)
+ for {
+ select {
+ case <-tt.C:
+ num := rand.Intn(1000) //nolint:gosec
+ g1.GetByPtr(predefinedSlice, res)
+ for k := range res {
+ g1.Remove(k, predefinedSlice[num])
+ }
+ }
+ }
+ }()
+
+ res := make(map[string]struct{}, 100)
+
+ for i := 0; i < b.N; i++ {
+ g1.GetByPtr(predefinedSlice, res)
+
+ for i := range res {
+ delete(res, i)
+ }
+ }
+}
+
+func TestBigSearchWithRemoves(t *testing.T) {
+ g1 := NewStorage()
+
+ predefinedSlice := make([]string, 0, 1000)
+ for i := 0; i < 1000; i++ {
+ predefinedSlice = append(predefinedSlice, uuid.NewString())
+ }
+ if predefinedSlice == nil {
+ t.FailNow()
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), uuid.NewString())
+ }
+
+ for i := 0; i < 1000; i++ {
+ g1.Insert(uuid.NewString(), predefinedSlice[i])
+ }
+
+ stopCh := make(chan struct{})
+
+ go func() {
+ tt := time.NewTicker(time.Microsecond)
+
+ res := make(map[string]struct{}, 1000)
+ for {
+ select {
+ case <-tt.C:
+ num := rand.Intn(1000) //nolint:gosec
+ g1.GetByPtr(predefinedSlice, res)
+ for k := range res {
+ g1.Remove(k, predefinedSlice[num])
+ }
+
+ case <-stopCh:
+ tt.Stop()
+ return
+ }
+ }
+ }()
+
+ res := make(map[string]struct{}, 100)
+
+ for i := 0; i < 1000; i++ {
+ g1.GetByPtr(predefinedSlice, res)
+
+ for i := range res {
+ delete(res, i)
+ }
+ }
+
+ stopCh <- struct{}{}
+}
+
+func TestGraph(t *testing.T) {
+ g := NewStorage()
+
+ for i := 0; i < 1000; i++ {
+ uid := uuid.New().String()
+ g.Insert(uuid.NewString(), uid)
+ }
+
+ g.Insert(uuid.NewString(), predifined)
+
+ res := make(map[string]struct{})
+
+ g.GetByPtr([]string{predifined}, res)
+ assert.NotEmpty(t, res)
+ assert.Len(t, res, 1)
+}
+
+func TestTreeConcurrentContains(t *testing.T) {
+ g := NewStorage()
+
+ key1 := uuid.NewString()
+ key2 := uuid.NewString()
+ key3 := uuid.NewString()
+ key4 := uuid.NewString()
+ key5 := uuid.NewString()
+
+ g.Insert(key1, predifined)
+ g.Insert(key2, predifined)
+ g.Insert(key3, predifined)
+ g.Insert(key4, predifined)
+ g.Insert(key5, predifined)
+
+ res := make(map[string]struct{}, 100)
+
+ for i := 0; i < 100; i++ {
+ go func() {
+ g.GetByPtrTS([]string{predifined}, res)
+ }()
+
+ go func() {
+ g.GetByPtrTS([]string{predifined}, res)
+ }()
+
+ go func() {
+ g.GetByPtrTS([]string{predifined}, res)
+ }()
+
+ go func() {
+ g.GetByPtrTS([]string{predifined}, res)
+ }()
+ }
+
+ time.Sleep(time.Second * 5)
+
+ res2 := make(map[string]struct{}, 5)
+
+ g.GetByPtr([]string{predifined}, res2)
+ assert.NotEmpty(t, res2)
+ assert.Len(t, res2, 5)
+}
+
+func TestGraphRemove(t *testing.T) {
+ g := NewStorage()
+
+ key1 := uuid.NewString()
+ key2 := uuid.NewString()
+ key3 := uuid.NewString()
+ key4 := uuid.NewString()
+ key5 := uuid.NewString()
+
+ g.Insert(key1, predifined)
+ g.Insert(key2, predifined)
+ g.Insert(key3, predifined)
+ g.Insert(key4, predifined)
+ g.Insert(key5, predifined)
+
+ res := make(map[string]struct{}, 5)
+ g.GetByPtr([]string{predifined}, res)
+ assert.NotEmpty(t, res)
+ assert.Len(t, res, 5)
+
+ g.Remove(key1, predifined)
+
+ res2 := make(map[string]struct{}, 4)
+ g.GetByPtr([]string{predifined}, res2)
+ assert.NotEmpty(t, res2)
+ assert.Len(t, res2, 4)
+}
diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go
new file mode 100644
index 00000000..e666f846
--- /dev/null
+++ b/plugins/websockets/validator/access_validator.go
@@ -0,0 +1,76 @@
+package validator
+
+import (
+ "net/http"
+ "strings"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
+)
+
+type AccessValidatorFn = func(r *http.Request, channels ...string) (*AccessValidator, error)
+
+type AccessValidator struct {
+ Header http.Header `json:"headers"`
+ Status int `json:"status"`
+ Body []byte
+}
+
+func ServerAccessValidator(r *http.Request) ([]byte, error) {
+ const op = errors.Op("server_access_validator")
+
+ err := attributes.Set(r, "ws:joinServer", true)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ defer delete(attributes.All(r), "ws:joinServer")
+
+ req := &handler.Request{
+ RemoteAddr: handler.FetchIP(r.RemoteAddr),
+ Protocol: r.Proto,
+ Method: r.Method,
+ URI: handler.URI(r),
+ Header: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ Attributes: attributes.All(r),
+ }
+
+ data, err := json.Marshal(req)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return data, nil
+}
+
+func TopicsAccessValidator(r *http.Request, topics ...string) ([]byte, error) {
+ const op = errors.Op("topic_access_validator")
+ err := attributes.Set(r, "ws:joinTopics", strings.Join(topics, ","))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ defer delete(attributes.All(r), "ws:joinTopics")
+
+ req := &handler.Request{
+ RemoteAddr: handler.FetchIP(r.RemoteAddr),
+ Protocol: r.Proto,
+ Method: r.Method,
+ URI: handler.URI(r),
+ Header: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ Attributes: attributes.All(r),
+ }
+
+ data, err := json.Marshal(req)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return data, nil
+}
diff --git a/tests/Dockerfile b/tests/Dockerfile
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/Dockerfile
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 575fe656..f6533dc4 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -12,8 +12,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/stretchr/testify/assert"
"net/http"
diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go
index 15c82839..d75620f3 100644
--- a/tests/plugins/http/parse_test.go
+++ b/tests/plugins/http/parse_test.go
@@ -3,7 +3,7 @@ package http
import (
"testing"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
)
var samples = []struct {
diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go
index 3564d9cd..276c22ef 100644
--- a/tests/plugins/http/response_test.go
+++ b/tests/plugins/http/response_test.go
@@ -7,7 +7,7 @@ import (
"testing"
"github.com/spiral/roadrunner/v2/pkg/payload"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/stretchr/testify/assert"
)
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index 5c39589c..903a930a 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -18,8 +18,8 @@ import (
j "github.com/json-iterator/go"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/stretchr/testify/assert"
)
diff --git a/tests/plugins/informer/.rr-informer.yaml b/tests/plugins/informer/.rr-informer.yaml
index e1edbb44..94c9a856 100644
--- a/tests/plugins/informer/.rr-informer.yaml
+++ b/tests/plugins/informer/.rr-informer.yaml
@@ -3,8 +3,8 @@ server:
user: ""
group: ""
env:
- "RR_CONFIG": "/some/place/on/the/C134"
- "RR_CONFIG2": "C138"
+ - RR_CONFIG: "/some/place/on/the/C134"
+ - RR_CONFIG: "C138"
relay: "pipes"
relay_timeout: "20s"
@@ -12,4 +12,4 @@ rpc:
listen: tcp://127.0.0.1:6001
logs:
mode: development
- level: error \ No newline at end of file
+ level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
new file mode 100644
index 00000000..dc073be3
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
@@ -0,0 +1,39 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:11111
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # pubsubs might use general config section or its own
+ pubsubs: [ "redis" ]
+ path: "/ws"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
new file mode 100644
index 00000000..896cee05
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
@@ -0,0 +1,37 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../worker-ok.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:11113
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ pubsubs: [ "memory" ]
+ path: "/ws"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
new file mode 100644
index 00000000..e3bf5218
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
@@ -0,0 +1,37 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../worker-deny.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:11112
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ pubsubs: [ "memory" ]
+ path: "/ws"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
new file mode 100644
index 00000000..0614f4e7
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
@@ -0,0 +1,37 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../worker-stop.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:11114
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ pubsubs: [ "memory" ]
+ path: "/ws"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml
new file mode 100644
index 00000000..eedf5377
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml
@@ -0,0 +1,39 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:13235
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # pubsubs might use general config section
+ pubsubs: [ "redis", "memory" ]
+ path: "/ws"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
new file mode 100644
index 00000000..772b53ac
--- /dev/null
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -0,0 +1,885 @@
+package websockets
+
+import (
+ "net"
+ "net/http"
+ "net/rpc"
+ "net/url"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
+ endure "github.com/spiral/endure/pkg/container"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/websockets"
+ "github.com/spiral/roadrunner/v2/utils"
+ "github.com/stretchr/testify/assert"
+)
+
+type Msg struct {
+ // Topic message been pushed into.
+ Topics []string `json:"topic"`
+
+ // Command (join, leave, headers)
+ Command string `json:"command"`
+
+ // Broker (redis, memory)
+ Broker string `json:"broker"`
+
+ // Payload to be broadcasted
+ Payload []byte `json:"payload"`
+}
+
+func TestBroadcastInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ )
+
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("TestWSInit", wsInit)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func wsInit(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+
+ defer func() {
+ _ = resp.Body.Close()
+ }()
+
+ d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+}
+
+func TestWSRedisAndMemory(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-redis-memory.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("RPCWsMemory", RPCWsMemory)
+ t.Run("RPCWsRedis", RPCWsRedis)
+ t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func RPCWsMemoryPubAsync(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+
+ defer func() {
+ _ = resp.Body.Close()
+ }()
+
+ d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ publishAsync("", "memory", "foo")
+
+ // VERIFY a message
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP", retMsg)
+
+ // //// LEAVE foo, foo2 /////////
+ d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
+ publishAsync("", "memory", "foo")
+
+ go func() {
+ time.Sleep(time.Second * 5)
+ publishAsync2("", "memory", "foo2")
+ }()
+
+ // should be only message from the subscribed foo2 topic
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP2", retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+}
+
+func RPCWsMemory(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+
+ defer func() {
+ if resp != nil && resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ }()
+
+ d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ publish("", "memory", "foo")
+
+ // VERIFY a message
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP", retMsg)
+
+ // //// LEAVE foo, foo2 /////////
+ d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
+ publish("", "memory", "foo")
+
+ go func() {
+ time.Sleep(time.Second * 5)
+ publish2("", "memory", "foo2")
+ }()
+
+ // should be only message from the subscribed foo2 topic
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP2", retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+}
+
+func RPCWsRedis(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+
+ defer func() {
+ _ = resp.Body.Close()
+ }()
+
+ d, err := json.Marshal(message("join", "redis", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ publish("", "redis", "foo")
+
+ // VERIFY a message
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP", retMsg)
+
+ // //// LEAVE foo, foo2 /////////
+ d, err = json.Marshal(message("leave", "redis", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
+ publish("", "redis", "foo")
+
+ go func() {
+ time.Sleep(time.Second * 5)
+ publish2("", "redis", "foo2")
+ }()
+
+ // should be only message from the subscribed foo2 topic
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP2", retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+}
+
+func TestWSMemoryDeny(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-memory-deny.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("RPCWsMemoryDeny", RPCWsMemoryDeny)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func RPCWsMemoryDeny(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11112", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
+
+ defer func() {
+ if resp != nil && resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ }()
+
+ d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
+
+ // //// LEAVE foo, foo2 /////////
+ d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+}
+
+func TestWSMemoryStop(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-memory-stop.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("RPCWsMemoryStop", RPCWsMemoryStop)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func RPCWsMemoryStop(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11114", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NotNil(t, resp)
+ assert.Error(t, err)
+ assert.Nil(t, c)
+ assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) //nolint:staticcheck
+ assert.Equal(t, resp.Header.Get("Stop"), "we-dont-like-you") //nolint:staticcheck
+ if resp != nil && resp.Body != nil { //nolint:staticcheck
+ _ = resp.Body.Close()
+ }
+}
+
+func TestWSMemoryOk(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-memory-allow.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("RPCWsMemoryAllow", RPCWsMemoryAllow)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func RPCWsMemoryAllow(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11113", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
+
+ defer func() {
+ if resp != nil && resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ }()
+
+ d, err := json.Marshal(message("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ publish("", "memory", "foo")
+
+ // VERIFY a message
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP", retMsg)
+
+ // //// LEAVE foo, foo2 /////////
+ d, err = json.Marshal(message("leave", "memory", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
+ publish("", "memory", "foo")
+
+ go func() {
+ time.Sleep(time.Second * 5)
+ publish2("", "memory", "foo2")
+ }()
+
+ // should be only message from the subscribed foo2 topic
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "hello, PHP2", retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+}
+
+func publish(command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var ret bool
+ err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ if err != nil {
+ panic(err)
+ }
+}
+
+func publishAsync(command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var ret bool
+ err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP"), topics...)}, &ret)
+ if err != nil {
+ panic(err)
+ }
+}
+
+func publishAsync2(command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var ret bool
+ err = client.Call("websockets.PublishAsync", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ if err != nil {
+ panic(err)
+ }
+}
+
+func publish2(command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ var ret bool
+ err = client.Call("websockets.Publish", []*Msg{message(command, broker, []byte("hello, PHP2"), topics...)}, &ret)
+ if err != nil {
+ panic(err)
+ }
+}
+
+func message(command string, broker string, payload []byte, topics ...string) *Msg {
+ return &Msg{
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ }
+}
diff --git a/tests/worker-deny.php b/tests/worker-deny.php
new file mode 100644
index 00000000..6dc993f6
--- /dev/null
+++ b/tests/worker-deny.php
@@ -0,0 +1,30 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require __DIR__ . "/vendor/autoload.php";
+
+$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ $worker,
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
+
+while ($req = $psr7->waitRequest()) {
+ try {
+ $resp = new \Nyholm\Psr7\Response();
+ if ($req->getAttribute('ws:joinServer')) {
+ $psr7->respond($resp->withStatus(200));
+ } else {
+ $psr7->respond($resp->withStatus(401));
+ }
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+}
diff --git a/tests/worker-ok.php b/tests/worker-ok.php
new file mode 100644
index 00000000..63558b0f
--- /dev/null
+++ b/tests/worker-ok.php
@@ -0,0 +1,27 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require __DIR__ . "/vendor/autoload.php";
+
+$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ $worker,
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
+
+while ($req = $psr7->waitRequest()) {
+ try {
+ $resp = new \Nyholm\Psr7\Response();
+ $resp->getBody()->write($_SERVER['RR_BROADCAST_PATH'] ?? '');
+ $psr7->respond($resp);
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+}
diff --git a/tests/worker-stop.php b/tests/worker-stop.php
new file mode 100644
index 00000000..83fc5710
--- /dev/null
+++ b/tests/worker-stop.php
@@ -0,0 +1,26 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require __DIR__ . "/vendor/autoload.php";
+
+$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ $worker,
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
+
+while ($req = $psr7->waitRequest()) {
+ try {
+ $resp = new \Nyholm\Psr7\Response();
+ $psr7->respond($resp->withAddedHeader('stop', 'we-dont-like-you')->withStatus(401));
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+}
diff --git a/utils/network.go b/utils/network.go
index b73363db..86a7e733 100755
--- a/utils/network.go
+++ b/utils/network.go
@@ -12,7 +12,8 @@ import (
"github.com/valyala/tcplisten"
)
-// - SO_REUSEPORT. This option allows linear scaling server performance
+// CreateListener
+// - SO_REUSEPORT. This option allows linear scaling server performance
// on multi-CPU servers.
// See https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ for details.
//