summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-21 08:49:47 +0300
committerGitHub <[email protected]>2021-06-21 08:49:47 +0300
commit591b69be9279158cfe9e7083152d308c4ee480b7 (patch)
tree5c6b1d4ee2aea4e6a1cc828ca1fcb2306ef9741e
parent25e0841c6aa5e2686da5b9f74e3d77d3814ff592 (diff)
parent2ab22ac9e935efb126b51e9c3521073e6a5155a1 (diff)
#732 feat(plugin): rework `broadcast` plugin
#732 feat(plugin): rework `broadcast` plugin
-rw-r--r--.github/workflows/linux.yml1
-rw-r--r--.github/workflows/windows.yml1
-rw-r--r--CHANGELOG.md4
-rwxr-xr-xMakefile3
-rw-r--r--pkg/pubsub/interface.go18
-rw-r--r--pkg/pubsub/psmessage.go15
-rw-r--r--plugins/broadcast/config.go25
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio1
-rw-r--r--plugins/broadcast/interface.go7
-rw-r--r--plugins/broadcast/plugin.go208
-rw-r--r--plugins/broadcast/rpc.go87
-rw-r--r--plugins/kv/config.go2
-rw-r--r--plugins/kv/drivers/boltdb/driver.go2
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go2
-rw-r--r--plugins/kv/drivers/memcached/driver.go2
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/interface.go15
-rw-r--r--plugins/kv/plugin.go60
-rw-r--r--plugins/kv/rpc.go2
-rw-r--r--plugins/memory/kv.go2
-rw-r--r--plugins/memory/plugin.go4
-rw-r--r--plugins/memory/pubsub.go35
-rw-r--r--plugins/redis/channel.go97
-rw-r--r--plugins/redis/fanin.go102
-rw-r--r--plugins/redis/kv.go2
-rw-r--r--plugins/redis/plugin.go6
-rw-r--r--plugins/redis/pubsub.go56
-rw-r--r--plugins/websockets/config.go71
-rw-r--r--plugins/websockets/executor/executor.go66
-rw-r--r--plugins/websockets/origin_test.go12
-rw-r--r--plugins/websockets/plugin.go190
-rw-r--r--plugins/websockets/pool/workers_pool.go90
-rw-r--r--plugins/websockets/rpc.go75
-rw-r--r--proto/kv/v1beta/kv.pb.go (renamed from pkg/proto/kv/v1beta/kv.pb.go)0
-rw-r--r--proto/kv/v1beta/kv.proto (renamed from pkg/proto/kv/v1beta/kv.proto)0
-rw-r--r--proto/websockets/v1beta/websockets.pb.go (renamed from pkg/proto/websockets/v1beta/websockets.pb.go)42
-rw-r--r--proto/websockets/v1beta/websockets.proto (renamed from pkg/proto/websockets/v1beta/websockets.proto)5
-rw-r--r--tests/docker-compose.yaml4
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go474
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml33
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-global.yaml (renamed from tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml)21
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-init.yaml35
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml29
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml43
-rw-r--r--tests/plugins/broadcast/plugins/plugin1.go67
-rw-r--r--tests/plugins/broadcast/plugins/plugin2.go64
-rw-r--r--tests/plugins/broadcast/plugins/plugin3.go64
-rw-r--r--tests/plugins/broadcast/plugins/plugin4.go64
-rw-r--r--tests/plugins/broadcast/plugins/plugin5.go64
-rw-r--r--tests/plugins/broadcast/plugins/plugin6.go64
-rw-r--r--tests/plugins/kv/storage_plugin_test.go2
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-allow.yaml (renamed from tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml)9
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-allow2.yaml44
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml (renamed from tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml)6
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-deny.yaml (renamed from tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml)11
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-deny2.yaml40
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml10
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis.yaml (renamed from tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml)12
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-stop.yaml (renamed from tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml)9
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go710
60 files changed, 2154 insertions, 1037 deletions
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 69269557..62987771 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -91,6 +91,7 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets
docker-compose -f ./tests/docker-compose.yaml down
diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml
index 227c725b..f23f9b5d 100644
--- a/.github/workflows/windows.yml
+++ b/.github/workflows/windows.yml
@@ -90,6 +90,7 @@ jobs:
go test -v -race ./tests/plugins/resetter
go test -v -race ./tests/plugins/rpc
go test -v -race ./tests/plugins/kv
+ go test -v -race ./tests/plugins/broadcast
go test -v -race ./tests/plugins/websockets
go test -v -race ./plugins/websockets
docker-compose -f ./tests/docker-compose.yaml down
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6f7cf2cc..1f8e1733 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,10 @@ CHANGELOG
v2.3.1 (_.06.2021)
-------------------
+## 👀 New:
+
+- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` folder. [PR](https://github.com/spiral/roadrunner/pull/732)
+
## 🩹 Fixes:
- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717), [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
diff --git a/Makefile b/Makefile
index fbd5ab76..bcfce79d 100755
--- a/Makefile
+++ b/Makefile
@@ -31,6 +31,7 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/resetter.out -covermode=atomic ./tests/plugins/resetter
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/rpc.out -covermode=atomic ./tests/plugins/rpc
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/kv_plugin.out -covermode=atomic ./tests/plugins/kv
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/broadcast_plugin.out -covermode=atomic ./tests/plugins/broadcast
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/ws_plugin.out -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/ws_origin.out -covermode=atomic ./plugins/websockets
cat ./coverage/*.out > ./coverage/summary.out
@@ -61,6 +62,7 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/resetter
go test -v -race -tags=debug ./tests/plugins/rpc
go test -v -race -tags=debug ./tests/plugins/kv
+ go test -v -race -tags=debug ./tests/plugins/broadcast
go test -v -race -tags=debug ./tests/plugins/websockets
go test -v -race -tags=debug ./plugins/websockets
docker-compose -f tests/docker-compose.yaml down
@@ -91,5 +93,6 @@ testGo1.17beta1: ## Run application tests
go1.17beta1 test -v -race -tags=debug ./tests/plugins/rpc
go1.17beta1 test -v -race -tags=debug ./tests/plugins/kv
go1.17beta1 test -v -race -tags=debug ./tests/plugins/websockets
+ go1.17beta1 test -v -race -tags=debug ./tests/plugins/broadcast
go1.17beta1 test -v -race -tags=debug ./plugins/websockets
docker-compose -f tests/docker-compose.yaml down
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index d021dbbe..06252d70 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -1,7 +1,5 @@
package pubsub
-import websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
-
/*
This interface is in BETA. It might be changed.
*/
@@ -16,6 +14,11 @@ type PubSub interface {
Reader
}
+type SubReader interface {
+ Subscriber
+ Reader
+}
+
// Subscriber defines the ability to operate as message passing broker.
// BETA interface
type Subscriber interface {
@@ -33,18 +36,19 @@ type Subscriber interface {
// BETA interface
type Publisher interface {
// Publish one or multiple Channel.
- Publish(messages []byte) error
+ Publish(message *Message) error
// PublishAsync publish message and return immediately
// If error occurred it will be printed into the logger
- PublishAsync(messages []byte)
+ PublishAsync(message *Message)
}
// Reader interface should return next message
type Reader interface {
- Next() (*websocketsv1.Message, error)
+ Next() (*Message, error)
}
-type PSProvider interface {
- PSProvide(key string) (PubSub, error)
+// Constructor is a special pub-sub interface made to return a constructed PubSub type
+type Constructor interface {
+ PSConstruct(key string) (PubSub, error)
}
diff --git a/pkg/pubsub/psmessage.go b/pkg/pubsub/psmessage.go
new file mode 100644
index 00000000..e33d9284
--- /dev/null
+++ b/pkg/pubsub/psmessage.go
@@ -0,0 +1,15 @@
+package pubsub
+
+import json "github.com/json-iterator/go"
+
+// Message represents a single message with payload bound to a particular topic
+type Message struct {
+ // Topic (channel in terms of redis)
+ Topic string `json:"topic"`
+ // Payload (on some decode stages might be represented as base64 string)
+ Payload []byte `json:"payload"`
+}
+
+func (m *Message) MarshalBinary() (data []byte, err error) {
+ return json.Marshal(m)
+}
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
new file mode 100644
index 00000000..4f1e5213
--- /dev/null
+++ b/plugins/broadcast/config.go
@@ -0,0 +1,25 @@
+package broadcast
+
+/*
+
+# Global redis config (priority - 2)
+
+websockets: # <----- one of possible subscribers
+ path: /ws
+ broker: default # <------ broadcast broker to use --------------- |
+ | match
+broadcast: # <-------- broadcast entry point plugin |
+ default: # <----------------------------------------------------- |
+ driver: redis
+ # local redis config (priority - 1)
+ test:
+ driver: memory
+
+
+priority local -> global
+*/
+
+// Config ...
+type Config struct {
+ Data map[string]interface{} `mapstructure:"broadcast"`
+}
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..fd5ff1f9
--- /dev/null
+++ b/plugins/broadcast/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-18T09:34:25.915Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="THNfOcV33EQGG0gzo1UK" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1bc6M4Fv41rk1vVVIg7o+Jk8l01fR2Np7e7n7a4iLbbDB4AMdJ//qVQGCQZBsHEMSTviRGIAznfj4dSRNlunq5j+318kvkwWACJO9lotxOALBkE/3EDa95AzAVJW9ZxL6Xt8m7hpn/C5JGibRufA8mtQvTKApSf11vdKMwhG5aa7PjONrWL5tHQf1b1/YCMg0z1w7Y1u++ly7Ja0iStDvxO/QXy5Q+s7KLq0lDsrS9aFtpUu4myjSOojT/tHqZwgBTryBM3u+3PWfLJ4thmDbpMFV1/fPNfXhp/FAuZ+Hz8kd0f6mr+W2e7WBDXpk8bfpa0CCONqEH8V2kiXKzXfopnK1tF5/dIq6jtmW6CtCRjD7O/SCYRkEUZ30Vz4bm3EXtSRpHT7ByRndN6MzRGfY9yKs9wziFL5Um8l73MFrBNH5Fl5CzikloTMRM1cjxtsazvG1ZZZdFGm0iJ4vy3jtKog+EmCcQVtP7Jexcw3+5hM3+4B5RmFba8z/dEFyW5DrFDYOluGpyKK7oWl8U186b4qBO8ctSoAckec/WY2iSU0KuSNrQFLcY+kIPOSxyGMXpMlpEoR3c7Vpv6hzYXfNHFK0J3f8H0/SVeF97k0Z1rsAXP/2Bu19p5OgnuRn+fPtSPXgtDkL0upVO+PBncT98sOuWHRX99vItiTaxCw+QpggP7HgB00PXEYZhwh0UgxgGduo/1yMBHkdJ14fIR89cio8BpCtZBaZmkJ81WdIkyvPkz03uQUlJ+VBvF5xCIoeSHFAVHbmh6FQFpyJHe0SHNh6mC11u6OGYGo4QRAhbEV91LGzXcWy/Vi5YY7FJ9suibmo16dNVKlakr5fbXW/Kh69XJKnV9ZqlUUqSU6RTlSkcxs653Tx+vb6dXs/+7NbLtRDU5t5M1ygSciI2WeU4M62vEFkd0iLJFXO082unGSRwqkWazwGf0Z7u6JreyiKBpu6va4PUSgYAo2OPd7efZ6jp4dvN7NtNt4o2h/oe+huWI3WkaCoVqJcRYlXRgEhFG9T1d6FoY9Iz5V3qmcLo2Ze7L18ff56TonE9mlBFM967ohlj0jT9XWoai/tdf/n3w0T5Df87I23T9aG1zWQojV49wKMA6Muy7vaz7Qe2g4iMXn7jJBsHffBiJANxwrAA0SGt09kO/EWIPruIahCR9AZTy3ft4JqcWPmelysxTPxf2RflDCT5GLqvdjPRbvG9kN4muQrLDKvCKIT9IOGaynKJj4T3xSVZGdIojhyykpqauHHZuOK5q6lxHNmeaydY+XysLnNsvbo0c54GTU/lmTkTOIp+0NeckCbrb4ze1d4USP9QoGN6cVyBtK4V6E04nEE50WL8cR/uZRlmq+v1I9fLsiUd6tAPsKaBYSXaaC7SAkRT7dy2v000DaUmCbJ5ouhQHXoSnUGjiXqKZTTNsUZnDYE1CpEzKQlSSay4T+To62VZO1VGqR49DRywIwcToAc4u1jXZFf/a4Orf25WiGE+Siiu0Vlp/YJ+ZnIg5e2XKRZTfE6tnMPpyiXJUPA5kqSU90SfFuS3vcKRVcAeVQI10lzvUhxlFztx5ZZ5S9FQBn/ZA5Av+s2hr0dta7ptydy1OxKRcwGcp8fpl13FUO8eprdZvnjxBF+zB4v9cPEJfZptnEdoe7U3rrwdZaSSpb3GH+OsSK1qWyJ083mQFYbhgHdPWNwh7nJCLEyN6imclL+s1KvGwkZvsfBH/cPREPe49RdUAMFYX42GgnqueZDZirDv0Eki9wmmLP7TJjd1ZM+bSzx9lCVDsToCd2RZoRFv44otSeKmp0pfFUmywRD5c+inFxhqK/3CJ4baZ4G2MUV5OuCgBWLhNhYVfYQuxFoMJKcRSHOmrOEgobIukjWlJX9PuUvVdzUYH3q7+yrIftx9AUHuC9BYuiJdYYDd0sjPRs7s1KRILqMnCsLfm+SwHUxKVnvIccCg1TvjjsSai3I/qCQ3EruSJUVVTWBopg7qUI9Kz4LoOS4r6FPxUV+3Ye6fnnBCJSUoTfKjMHNTk2x2z9xfnEXEJnO8kNDhBDDseFzNC4GGXkiue6GjdXctVBc0Vd3Ok6h2XAWMShW4hOc/F7DEdpf9YMyjxC0ql3B6lXBQ6CTrHDwhipoBJx6c25sgbXM7OwiiLfT+G8V+CfLssJh/1mCZN3/JOpuQV781us82OX77NnaniJ1zFGogM6RQM1lUi5M28op/e0NywMeo5tGw4LgRMkTFD6Z1tZvIohbfW+CCouMHFtcpkYarHWJbGilioTBqe0lj21Uc9/2FFxJV1K8qbHgBuDPUetNrFg6aIXHAwgRfoIs0EhF6momk7aFfK5gk9gIiIywFWIvfIRPoGE/hIQ1iYzzzw7jupY3V0Lgq/UzdajBIWc47FWVPLUZnv6bLLBF7CDaLLAVjsavx13IxmolS4OEBc4XNfv++gLksWRxjKRQxV9jR+g/EPFcfHlghFDJXwJCObOyQeeGhjrsyXZAroyFzyxoEMpeLVXgaY+aohwDQXPmohe9AmE1xcdkB0NwSXMxQ0GdI0NyTXAgBL2zTdE02+kqorKYT+3pLqJRhV8+o+aHxgeZK05l63adU7bjKW6OLgZLzVKg7vDyFSQpa3GxfNaY40FqQGaBBa0sbGrRWBp3gO3L/bTY1Av0Un58KWpui/TdbmDU20FqQXtOgtcmBAMSC1goLgM1g/Nzx/MVhgieTs0yg2OCpuPGH1dw/+ev4LDFRKTyNRtNi0bOZVFk0ro5Fs4hQq7UTbM/yuCtM2lCVFdCTUurDw9AqYAj9N4Khafhn8LptlV2h5wOFrkX4g4HQ6ogWqhsfCK02Tf5VUbgdDUKDOgYtm2ojj9YahVZOxKCJ69t7PVPEVbu+H8RaHXjZ2FHHbk0lv8h3BkasL2kL2Xcox8JrwhFr0fHdcEnWsNDUyBFqtSk41X2a1Y6rLGi0D6HOs6NprkLdotXyKTcbHJ0WpPK0Ny5nGx9Fp3vL6D4mxx91wcd9dT+r8ZyKTl/Kgp11QaARw9OCFJuGpy85K8CJhac1FhPrHp4eCAm7lAcf3NfPvipH5M4YmtHQ0o5rSUuNnbfw+DCt4237YptkbYdFWwnVXT1snMBPlhdkhsOnSiRU7dCpEkPZ06DB46ylG4rd1bpASn1dIIM3xGQIVeFBl1Y4SYXRwQOMffTe2I8OFQ41zYe0zmeYtdNSNh/aJJwwo1hua7MKrl085WgHff9hOzB4iBI/Qx6UWydK02g1YbHxbEmx2kpdmzTwQ6RUxVaTB3nQXJ0Mido8h7N7ocLRJr03bWKBmxkMvUk5WwtLToS/Gf0nZo7DhLMYj1AokNbgrQfMs3W9DUforKd6iKNn38Nsqa3ElzMpqg2fnum65zq1QRRoulRdF2y6MR7+9f3PG/mv6eNqMdV/RL/Pvg27FwS131rjqBIICSspZ3LUhXHpyymc4F5ntvRX/PRdo5YJlzXaGneXr/Nff9ApnvVByaZ4b0v5aryXsFD5kkUFRNxvx/1HY2aabuvYKnkdqRSALszMqUPOjBUy9GNjzvLhHq1HkQ9Rkb9dRJcYVlMPxAQWHOnYG2to9KxijbeaSl8QFt8MMATO8dpMAhc+IodNBloTdwlX9miDwFaMAQxjOJlUb7VDXMaMqHRoXFvvdWKdOXvx7bc/Q7loTmEftcUljSt+Xq0DuIKZ6cf59Szbt2oC8NDKw2wahYi4mwzW6NR4Nt1GrGMd5S0A0Nc+tFwGsasJkX3EkvzbjhH9LGwnjXPw+SLUdrIoVIUP6ISLQXRpXWIfXuHyWI1y7ARi/CpzgekS7vrh1rzuaFP6yGyp/8ZMJlUI42cxoDASWee4Ry7S2MXIJpfFA1csGG/KXoxa+rJziSN0kByYf3wOskhJPoKkYWWgn0IXnV7dSpOkq7w8VdJNC7+uSNyMHUCa3T3+544ThIX+biPRfPNQHAqgmMpd4vYYcirnkjV0/bmfeRV/52v6KGwVEq0xqa7atFqji8Epfqo78GzCt/gMqtD11F2WBUNejTGvLuzFyXuGGSplTo7sAWape0CBxh10st5ovxAZZxUyKk2ULrBN8lHc+SszMHY2JJvg1TWh94m1YLOyuv4Cvtg4p/xErdjcxh4dWSKiqci2Mk/0yLms6iwSB4rlYrteIoLPx/e4rrHRBpBvXHPUjXUCY7ZOLL5ebJu+19rQ4dHJHQz5cIdyT6I9HXqyZywk3dPeW43lr5WpKXeO3PkRkXN++DRmK0Hu8wnDq9qO93MMWpFQdIXfqZh3ddAbnAWspZu0M+dVc/Pgxt5grWIPwyreuITuU4Uv58kKpmTCZJ21JZITgI25fuKVyHHZ72f8EHamTOnST6rMmWbPh/M/KTvxinzWZAc/niXvaKejS5xAS6gaAd6CCekmxvEuqUvMx03KAm4cLa+L9awvYuhl3CNpes5iEiWfJQ8NqmpbNjgrKyhCa+SKTcRqQy+VgRYphDCH6HN9w5rnQNfeJFj5thnoH4X/wMHOUxhtJyyUb6el5yOADsqXFvCKzZZKedn62YM7uI/9bPtBxjxSTOnkcE42PJQ9mPNafsFd6G0yqxCFQRY0zvMqzOoXZxxLEuhdNVnY7xzFkAlfAWtJVN440xsGIdBhHOGdfXaRLYoml18iD+Ir/g8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
new file mode 100644
index 00000000..46709d71
--- /dev/null
+++ b/plugins/broadcast/interface.go
@@ -0,0 +1,7 @@
+package broadcast
+
+import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+
+type Broadcaster interface {
+ GetDriver(key string) (pubsub.SubReader, error)
+}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
new file mode 100644
index 00000000..6ddef806
--- /dev/null
+++ b/plugins/broadcast/plugin.go
@@ -0,0 +1,208 @@
+package broadcast
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/google/uuid"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "broadcast"
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ redis string = "redis"
+ memory string = "memory"
+)
+
+type Plugin struct {
+ sync.RWMutex
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
+ // publishers implement Publisher interface
+ // and able to receive a payload
+ publishers map[string]pubsub.PubSub
+ constructors map[string]pubsub.Constructor
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+ p.cfg = &Config{}
+ // unmarshal config section
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.publishers = make(map[string]pubsub.PubSub)
+ p.constructors = make(map[string]pubsub.Constructor)
+
+ p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectPublishers,
+ }
+}
+
+// CollectPublishers collect all plugins who implement pubsub.Publisher interface
+func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) {
+ // key redis, value - interface
+ p.constructors[name.Name()] = constructor
+}
+
+// Publish is an entry point to the websocket PUBSUB
+func (p *Plugin) Publish(m *pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("broadcast_plugin_publish")
+
+ // check if any publisher registered
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err := p.publishers[j].Publish(m)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ return nil
+ } else {
+ p.log.Warn("no publishers registered")
+ }
+
+ return nil
+}
+
+func (p *Plugin) PublishAsync(m *pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ // check if any publisher registered
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err := p.publishers[j].Publish(m)
+ if err != nil {
+ p.log.Error("publishAsync", "error", err)
+ // continue publish to other registered publishers
+ continue
+ }
+ }
+ } else {
+ p.log.Warn("no publishers registered")
+ }
+ }()
+}
+
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit
+ const op = errors.Op("broadcast_plugin_get_driver")
+
+ // choose a driver
+ if val, ok := p.cfg.Data[key]; ok {
+ // check type of the v
+ // should be a map[string]interface{}
+ switch t := val.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val)))
+ }
+ default:
+ return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation"))
+ }
+
+ // config key for the particular sub-driver kv.memcached
+ configKey := fmt.Sprintf("%s.%s", PluginName, key)
+
+ switch val.(map[string]interface{})[driver] {
+ case memory:
+ if _, ok := p.constructors[memory]; !ok {
+ return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
+ }
+ ps, err := p.constructors[memory].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
+ p.publishers[uuid.NewString()] = ps
+
+ return ps, nil
+ case redis:
+ if _, ok := p.constructors[redis]; !ok {
+ return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ }
+
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ ps, err := p.constructors[redis].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if section already exists, return new connection
+ if _, ok := p.publishers[configKey]; ok {
+ return ps, nil
+ }
+
+ // if not - initialize a connection
+ p.publishers[configKey] = ps
+ return ps, nil
+
+ // then try global if local does not exist
+ case p.cfgPlugin.Has(redis):
+ ps, err := p.constructors[redis].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if section already exists, return new connection
+ if _, ok := p.publishers[configKey]; ok {
+ return ps, nil
+ }
+
+ // if not - initialize a connection
+ p.publishers[configKey] = ps
+ return ps, nil
+ }
+ }
+ }
+ return nil, errors.E(op, errors.Str("could not find driver by provided key"))
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
new file mode 100644
index 00000000..2ee211f8
--- /dev/null
+++ b/plugins/broadcast/rpc.go
@@ -0,0 +1,87 @@
+package broadcast
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+// Publish ... msg is a proto decoded payload
+// see: root/proto
+func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("broadcast_publish")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.String())
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
+
+ err := r.plugin.Publish(tmp)
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+ }
+ }
+
+ out.Ok = true
+ return nil
+}
+
+// PublishAsync ...
+// see: root/proto
+func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.GetMessages())
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
+
+ r.plugin.PublishAsync(tmp)
+ }
+ }
+
+ out.Ok = true
+ return nil
+}
diff --git a/plugins/kv/config.go b/plugins/kv/config.go
index 66095817..09ba79cd 100644
--- a/plugins/kv/config.go
+++ b/plugins/kv/config.go
@@ -1,6 +1,6 @@
package kv
-// Config represents general storage configuration with keys as the user defined kv-names and values as the drivers
+// Config represents general storage configuration with keys as the user defined kv-names and values as the constructors
type Config struct {
Data map[string]interface{} `mapstructure:"kv"`
}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 5f4d98b1..4b675271 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -9,10 +9,10 @@ import (
"time"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
)
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 28e2a89c..6ae1a1f6 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -46,7 +46,7 @@ func (s *Plugin) Stop() error {
return nil
}
-func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index c1f79cbb..a2787d72 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,10 +6,10 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
type Driver struct {
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 936b2047..22ea5cca 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -34,7 +34,7 @@ func (s *Plugin) Name() string {
// Available interface implementation
func (s *Plugin) Available() {}
-func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 5aedd5c3..ffdbbe62 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,6 +1,6 @@
package kv
-import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
+import kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -29,13 +29,8 @@ type Storage interface {
Delete(keys ...string) error
}
-// StorageDriver interface provide storage
-type StorageDriver interface {
- Provider
-}
-
-// Provider provides storage based on the config
-type Provider interface {
- // Provide provides Storage based on the config key
- KVProvide(key string) (Storage, error)
+// Constructor provides storage based on the config
+type Constructor interface {
+ // KVConstruct provides Storage based on the config key
+ KVConstruct(key string) (Storage, error)
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index efe92252..03dbaed6 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -24,8 +24,8 @@ const (
// Plugin for the unified storage
type Plugin struct {
log logger.Logger
- // drivers contains general storage drivers, such as boltdb, memory, memcached, redis.
- drivers map[string]StorageDriver
+ // constructors contains general storage constructors, such as boltdb, memory, memcached, redis.
+ constructors map[string]Constructor
// storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
storages map[string]Storage
// KV configuration
@@ -43,7 +43,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, err)
}
- p.drivers = make(map[string]StorageDriver, 5)
+ p.constructors = make(map[string]Constructor, 5)
p.storages = make(map[string]Storage, 5)
p.log = log
p.cfgPlugin = cfg
@@ -81,13 +81,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
addr: [ "localhost:11211" ]
- For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
when user requests for example boltdb-south, we should provide that particular preconfigured storage
*/
for k, v := range p.cfg.Data {
- if _, ok := v.(map[string]interface{})[driver]; !ok {
- errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
- return errCh
+ // for example if the key not properly formatted (yaml)
+ if v == nil {
+ continue
+ }
+
+ // check type of the v
+ // should be a map[string]interface{}
+ switch t := v.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ return errCh
+ }
+ default:
+ p.log.Warn("wrong type detected in the configuration, please, check yaml indentation")
+ continue
}
// config key for the particular sub-driver kv.memcached
@@ -95,12 +109,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// at this point we know, that driver field present in the configuration
switch v.(map[string]interface{})[driver] {
case memcached:
- if _, ok := p.drivers[memcached]; !ok {
- p.log.Warn("no memcached drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[memcached]; !ok {
+ p.log.Warn("no memcached constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[memcached].KVProvide(configKey)
+ storage, err := p.constructors[memcached].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -110,12 +124,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.storages[k] = storage
case boltdb:
- if _, ok := p.drivers[boltdb]; !ok {
- p.log.Warn("no boltdb drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[boltdb]; !ok {
+ p.log.Warn("no boltdb constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[boltdb].KVProvide(configKey)
+ storage, err := p.constructors[boltdb].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -124,12 +138,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case memory:
- if _, ok := p.drivers[memory]; !ok {
- p.log.Warn("no in-memory drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[memory]; !ok {
+ p.log.Warn("no in-memory constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[memory].KVProvide(configKey)
+ storage, err := p.constructors[memory].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -138,15 +152,15 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case redis:
- if _, ok := p.drivers[redis]; !ok {
- p.log.Warn("no redis drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[redis]; !ok {
+ p.log.Warn("no redis constructors registered", "registered", p.constructors)
continue
}
// first - try local configuration
switch {
case p.cfgPlugin.Has(configKey):
- storage, err := p.drivers[redis].KVProvide(configKey)
+ storage, err := p.constructors[redis].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -155,7 +169,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case p.cfgPlugin.Has(redis):
- storage, err := p.drivers[redis].KVProvide(configKey)
+ storage, err := p.constructors[redis].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -189,9 +203,9 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) {
- // save the storage driver
- p.drivers[name.Name()] = storage
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) {
+ // save the storage constructor
+ p.constructors[name.Name()] = constructor
}
// RPC returns associated rpc service.
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index ab1f7f31..af763600 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,8 +2,8 @@ package kv
import (
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
// Wrapper for the plugin
diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go
index 9b7d7259..1cf031d1 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/kv.go
@@ -6,10 +6,10 @@ import (
"time"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
type Driver struct {
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 6151ebf0..70badf15 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -41,11 +41,11 @@ func (p *Plugin) Stop() error {
return nil
}
-func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
return NewPubSubDriver(p.log, key)
}
-func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop)
if err != nil {
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index 75cd9d24..d027a8a5 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -4,16 +4,14 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
)
type PubSubDriver struct {
sync.RWMutex
// channel with the messages from the RPC
- pushCh chan []byte
+ pushCh chan *pubsub.Message
// user-subscribed topics
storage bst.Storage
log logger.Logger
@@ -21,21 +19,21 @@ type PubSubDriver struct {
func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) {
ps := &PubSubDriver{
- pushCh: make(chan []byte, 10),
+ pushCh: make(chan *pubsub.Message, 10),
storage: bst.NewBST(),
log: log,
}
return ps, nil
}
-func (p *PubSubDriver) Publish(message []byte) error {
- p.pushCh <- message
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
+ p.pushCh <- msg
return nil
}
-func (p *PubSubDriver) PublishAsync(message []byte) {
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
go func() {
- p.pushCh <- message
+ p.pushCh <- msg
}()
}
@@ -67,7 +65,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
+func (p *PubSubDriver) Next() (*pubsub.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
@@ -76,20 +74,13 @@ func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
p.RLock()
defer p.RUnlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return nil, err
- }
-
- // push only messages, which are subscribed
+ // push only messages, which topics are subscibed
// TODO better???
- for i := 0; i < len(m.GetTopics()); i++ {
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(m.GetTopics()[i]); ok {
- return m, nil
- }
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
}
+
return nil, nil
}
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go
new file mode 100644
index 00000000..5817853c
--- /dev/null
+++ b/plugins/redis/channel.go
@@ -0,0 +1,97 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type redisChannel struct {
+ sync.Mutex
+
+ // redis client
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan *pubsub.Message
+
+ exit chan struct{}
+}
+
+func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel {
+ out := make(chan *pubsub.Message, 100)
+ fi := &redisChannel{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (r *redisChannel) sub(topics ...string) error {
+ const op = errors.Op("redis_sub")
+ err := r.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (r *redisChannel) read() {
+ for {
+ select {
+ // here we receive message from us (which we sent before in Publish)
+ // it should be compatible with the pubsub.Message structure
+ // payload should be in the redis.message.payload field
+
+ case msg, ok := <-r.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+
+ r.out <- &pubsub.Message{
+ Topic: msg.Channel,
+ Payload: utils.AsBytes(msg.Payload),
+ }
+
+ case <-r.exit:
+ return
+ }
+ }
+}
+
+func (r *redisChannel) unsub(topic string) error {
+ const op = errors.Op("redis_unsub")
+ err := r.pubsub.Unsubscribe(context.Background(), topic)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (r *redisChannel) stop() error {
+ r.exit <- struct{}{}
+ close(r.out)
+ close(r.exit)
+ return nil
+}
+
+func (r *redisChannel) message() *pubsub.Message {
+ return <-r.out
+}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
deleted file mode 100644
index ac9ebcc2..00000000
--- a/plugins/redis/fanin.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package redis
-
-import (
- "context"
- "sync"
-
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type FanIn struct {
- sync.Mutex
-
- // redis client
- client redis.UniversalClient
- pubsub *redis.PubSub
-
- log logger.Logger
-
- // out channel with all subs
- out chan *websocketsv1.Message
-
- exit chan struct{}
-}
-
-func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *websocketsv1.Message, 100)
- fi := &FanIn{
- out: out,
- client: redisClient,
- pubsub: redisClient.Subscribe(context.Background()),
- exit: make(chan struct{}),
- log: log,
- }
-
- // start reading messages
- go fi.read()
-
- return fi
-}
-
-func (fi *FanIn) sub(topics ...string) error {
- const op = errors.Op("fanin_addchannel")
- err := fi.pubsub.Subscribe(context.Background(), topics...)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-// read reads messages from the pubsub subscription
-func (fi *FanIn) read() {
- for {
- select {
- // here we receive message from us (which we sent before in Publish)
- // it should be compatible with the websockets.Msg interface
- // payload should be in the redis.message.payload field
-
- case msg, ok := <-fi.pubsub.Channel():
- // channel closed
- if !ok {
- return
- }
-
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
- if err != nil {
- fi.log.Error("message unmarshal")
- continue
- }
-
- fi.out <- m
- case <-fi.exit:
- return
- }
- }
-}
-
-func (fi *FanIn) unsub(topic string) error {
- const op = errors.Op("fanin_remove")
- err := fi.pubsub.Unsubscribe(context.Background(), topic)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (fi *FanIn) stop() error {
- fi.exit <- struct{}{}
- close(fi.out)
- close(fi.exit)
- return nil
-}
-
-func (fi *FanIn) consume() <-chan *websocketsv1.Message {
- return fi.out
-}
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 66cb8384..320b7443 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -7,10 +7,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
)
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 24c21b55..9d98790b 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -59,8 +59,8 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-// KVProvide provides KV storage implementation over the redis plugin
-func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
+// KVConstruct provides KV storage implementation over the redis plugin
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("redis_plugin_provide")
st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
@@ -70,6 +70,6 @@ func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
return st, nil
}
-func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index dbda7ea4..4e41acb5 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,11 +6,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
)
type PubSubDriver struct {
@@ -18,7 +16,7 @@ type PubSubDriver struct {
cfg *Config `mapstructure:"redis"`
log logger.Logger
- fanin *FanIn
+ channel *redisChannel
universalClient redis.UniversalClient
stopCh chan struct{}
}
@@ -62,7 +60,12 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
MasterName: ps.cfg.MasterName,
})
- ps.fanin = newFanIn(ps.universalClient, log)
+ statusCmd := ps.universalClient.Ping(context.Background())
+ if statusCmd.Err() != nil {
+ return nil, statusCmd.Err()
+ }
+
+ ps.channel = newRedisChannel(ps.universalClient, log)
ps.stop()
@@ -72,47 +75,32 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
func (p *PubSubDriver) stop() {
go func() {
for range p.stopCh {
- _ = p.fanin.stop()
+ _ = p.channel.stop()
return
}
}()
}
-func (p *PubSubDriver) Publish(msg []byte) error {
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return errors.E(err)
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ return f.Err()
}
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- return f.Err()
- }
- }
return nil
}
-func (p *PubSubDriver) PublishAsync(msg []byte) {
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
go func() {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- p.log.Error("message unmarshal error")
- return
- }
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- p.log.Error("redis publish", "error", f.Err())
- }
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ p.log.Error("redis publish", "error", f.Err())
}
}()
}
@@ -128,13 +116,13 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
return err
}
if res == 0 {
- p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i])
+ p.log.Warn("could not subscribe to the provided topic, you might be already subscribed to it", "connectionID", connectionID, "topic", topics[i])
continue
}
}
// and subscribe after
- return p.fanin.sub(topics...)
+ return p.channel.sub(topics...)
}
func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
@@ -160,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error
}
// else - unsubscribe
- err = p.fanin.unsub(topics[i])
+ err = p.channel.unsub(topics[i])
if err != nil {
return err
}
@@ -176,7 +164,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
panic(err)
}
- // assighn connections
+ // assign connections
// res expected to be from the sync.Pool
for k := range r {
res[k] = struct{}{}
@@ -184,6 +172,6 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
- return <-p.fanin.consume(), nil
+func (p *PubSubDriver) Next() (*pubsub.Message, error) {
+ return p.channel.message(), nil
}
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index deb4406c..933a12e0 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -4,80 +4,42 @@ import (
"strings"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
)
/*
-# GLOBAL
-redis:
- addrs:
- - 'localhost:6379'
-
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
-
- pubsubs:["redis", "amqp", "memory"]
- # OR local
- redis:
- addrs:
- - 'localhost:6379'
-
- # path used as websockets path
+ broker: default
+ allowed_origin: "*"
path: "/ws"
*/
-type RedisConfig struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
- Path string `mapstructure:"path"`
- // ["redis", "amqp", "memory"]
- PubSubs []string `mapstructure:"pubsubs"`
- Middleware []string `mapstructure:"middleware"`
-
+ Path string `mapstructure:"path"`
AllowedOrigin string `mapstructure:"allowed_origin"`
+ Broker string `mapstructure:"broker"`
// wildcard origin
allowedWOrigins []wildcard
allowedOrigins []string
allowedAll bool
- Redis *RedisConfig `mapstructure:"redis"`
- Pool *pool.Config `mapstructure:"pool"`
+ // Pool with the workers for the websockets
+ Pool *pool.Config `mapstructure:"pool"`
}
// InitDefault initialize default values for the ws config
-func (c *Config) InitDefault() {
+func (c *Config) InitDefault() error {
if c.Path == "" {
c.Path = "/ws"
}
- if len(c.PubSubs) == 0 {
- // memory used by default
- c.PubSubs = append(c.PubSubs, "memory")
+ // broker is mandatory
+ if c.Broker == "" {
+ return errors.Str("broker key should be specified")
}
if c.Pool == nil {
@@ -99,13 +61,6 @@ func (c *Config) InitDefault() {
}
}
- if c.Redis != nil {
- if c.Redis.Addrs == nil {
- // append default
- c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379")
- }
- }
-
if c.AllowedOrigin == "" {
c.AllowedOrigin = "*"
}
@@ -115,7 +70,7 @@ func (c *Config) InitDefault() {
if origin == "*" {
// If "*" is present in the list, turn the whole list into a match all
c.allowedAll = true
- return
+ return nil
} else if i := strings.IndexByte(origin, '*'); i >= 0 {
// Split the origin in two: start and end string without the *
w := wildcard{origin[0:i], origin[i+1:]}
@@ -123,4 +78,6 @@ func (c *Config) InitDefault() {
} else {
c.allowedOrigins = append(c.allowedOrigins, origin)
}
+
+ return nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 5f904d26..664b4dfd 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,12 +7,12 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
)
type Response struct {
@@ -22,14 +22,15 @@ type Response struct {
type Executor struct {
sync.Mutex
+ // raw ws connection
conn *connection.Connection
log logger.Logger
// associated connection ID
connID string
- // map with the pubsub drivers
- pubsub map[string]pubsub.PubSub
+ // subscriber drivers
+ sub pubsub.Subscriber
actualTopics map[string]struct{}
req *http.Request
@@ -38,12 +39,12 @@ type Executor struct {
// NewExecutor creates protected connection and starts command loop
func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
log: log,
- pubsub: pubsubs,
+ sub: sub,
accessValidator: av,
actualTopics: make(map[string]struct{}, 10),
req: r,
@@ -67,20 +68,20 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
err = json.Unmarshal(data, msg)
if err != nil {
- e.log.Error("error unmarshal message", "error", err)
+ e.log.Error("unmarshal message", "error", err)
continue
}
// nil message, continue
if msg == nil {
- e.log.Warn("get nil message, skipping")
+ e.log.Warn("nil message, skipping")
continue
}
switch msg.Command {
// handle leave
case commands.Join:
- e.log.Debug("get join command", "msg", msg)
+ e.log.Debug("received join command", "msg", msg)
val, err := e.accessValidator(e.req, msg.Topics...)
if err != nil {
@@ -95,13 +96,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, errJ := json.Marshal(resp)
if errJ != nil {
- e.log.Error("error marshal the body", "error", errJ)
+ e.log.Error("marshal the body", "error", errJ)
return errors.E(op, fmt.Errorf("%v,%v", err, errJ))
}
errW := e.conn.Write(packet)
if errW != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", errW)
+ e.log.Error("write payload to the connection", "payload", packet, "error", errW)
return errors.E(op, fmt.Errorf("%v,%v", err, errW))
}
@@ -115,27 +116,25 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, err := json.Marshal(resp)
if err != nil {
- e.log.Error("error marshal the body", "error", err)
+ e.log.Error("marshal the body", "error", err)
return errors.E(op, err)
}
err = e.conn.Write(packet)
if err != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ e.log.Error("write payload to the connection", "payload", packet, "error", err)
return errors.E(op, err)
}
// subscribe to the topic
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Set(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Set(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
// handle leave
case commands.Leave:
- e.log.Debug("get leave command", "msg", msg)
+ e.log.Debug("received leave command", "msg", msg)
// prepare response
resp := &Response{
@@ -145,21 +144,19 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, err := json.Marshal(resp)
if err != nil {
- e.log.Error("error marshal the body", "error", err)
+ e.log.Error("marshal the body", "error", err)
return errors.E(op, err)
}
err = e.conn.Write(packet)
if err != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ e.log.Error("write payload to the connection", "payload", packet, "error", err)
return errors.E(op, err)
}
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Leave(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Leave(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
case commands.Headers:
@@ -170,13 +167,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
}
-func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Set(topics []string) error {
// associate connection with topics
- err := br.Subscribe(e.connID, topics...)
+ err := e.sub.Subscribe(e.connID, topics...)
if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
+ e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
- _ = br.Unsubscribe(e.connID, topics...)
+ _ = e.sub.Unsubscribe(e.connID, topics...)
return err
}
@@ -188,11 +185,11 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
return nil
}
-func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Leave(topics []string) error {
// remove associated connections from the storage
- err := br.Unsubscribe(e.connID, topics...)
+ err := e.sub.Unsubscribe(e.connID, topics...)
if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
+ e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error())
return err
}
@@ -207,10 +204,7 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
func (e *Executor) CleanUp() {
// unsubscribe particular connection from the topics
for topic := range e.actualTopics {
- // here
- for _, ps := range e.pubsub {
- _ = ps.Unsubscribe(e.connID, topic)
- }
+ _ = e.sub.Unsubscribe(e.connID, topic)
}
// clean up the actualTopics data
diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go
index e877fad3..bbc49bbb 100644
--- a/plugins/websockets/origin_test.go
+++ b/plugins/websockets/origin_test.go
@@ -9,9 +9,11 @@ import (
func TestConfig_Origin(t *testing.T) {
cfg := &Config{
AllowedOrigin: "*",
+ Broker: "any",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("http://some.some.some.sssome", cfg))
assert.True(t, isOriginAllowed("http://", cfg))
@@ -27,9 +29,11 @@ func TestConfig_Origin(t *testing.T) {
func TestConfig_OriginWildCard(t *testing.T) {
cfg := &Config{
AllowedOrigin: "https://*my.site.com",
+ Broker: "any",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("https://my.site.com", cfg))
assert.False(t, isOriginAllowed("http://", cfg))
@@ -48,9 +52,11 @@ func TestConfig_OriginWildCard(t *testing.T) {
func TestConfig_OriginWildCard2(t *testing.T) {
cfg := &Config{
AllowedOrigin: "https://my.*.com",
+ Broker: "any",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("https://my.site.com", cfg))
assert.False(t, isOriginAllowed("http://", cfg))
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 8b708187..ca5f2f59 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,7 +2,6 @@ package websockets
import (
"context"
- "fmt"
"net/http"
"sync"
"time"
@@ -10,14 +9,13 @@ import (
"github.com/fasthttp/websocket"
"github.com/google/uuid"
json "github.com/json-iterator/go"
- endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -26,7 +24,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "google.golang.org/protobuf/proto"
)
const (
@@ -35,14 +32,14 @@ const (
type Plugin struct {
sync.RWMutex
- // Collection with all available pubsubs
- pubsubs map[string]pubsub.PubSub
- psProviders map[string]pubsub.PSProvider
+ // subscriber+reader interfaces
+ subReader pubsub.SubReader
+ // broadcaster
+ broadcaster broadcast.Broadcaster
- cfg *Config
- cfgPlugin config.Configurer
- log logger.Logger
+ cfg *Config
+ log logger.Logger
// global connections map
connections sync.Map
@@ -53,14 +50,16 @@ type Plugin struct {
wsUpgrade *websocket.Upgrader
serveExit chan struct{}
+ // workers pool
phpPool phpPool.Pool
- server server.Server
+ // server which produces commands to the pool
+ server server.Server
// function used to validate access to the requested resource
accessValidator validator.AccessValidatorFn
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error {
const op = errors.Op("websockets_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -71,12 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- p.cfg.InitDefault()
- p.pubsubs = make(map[string]pubsub.PubSub)
- p.psProviders = make(map[string]pubsub.PSProvider)
-
- p.log = log
- p.cfgPlugin = cfg
+ err = p.cfg.InitDefault()
+ if err != nil {
+ return errors.E(op, err)
+ }
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
@@ -88,15 +85,17 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.serveExit = make(chan struct{})
p.server = server
-
+ p.log = log
+ p.broadcaster = b
return nil
}
func (p *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
const op = errors.Op("websockets_plugin_serve")
-
- err := p.initPubSubs()
+ errCh := make(chan error, 1)
+ // init broadcaster
+ var err error
+ p.subReader, err = p.broadcaster.GetDriver(p.cfg.Broker)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -122,76 +121,26 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
- p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
-
- // run all pubsubs drivers
- for _, v := range p.pubsubs {
- go func(ps pubsub.PubSub) {
- for {
- select {
- case <-p.serveExit:
- return
- default:
- data, err := ps.Next()
- if err != nil {
- errCh <- err
- return
- }
- p.workersPool.Queue(data)
- }
- }
- }(v)
- }
-
- return errCh
-}
-
-func (p *Plugin) initPubSubs() error {
- for i := 0; i < len(p.cfg.PubSubs); i++ {
- // don't need to have a section for the in-memory
- if p.cfg.PubSubs[i] == "memory" {
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- r, err := provider.PSProvide("")
- if err != nil {
- return err
- }
+ p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log)
- // append default in-memory provider
- p.pubsubs["memory"] = r
- }
- continue
- }
- // key - memory, redis
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- // try local key
- switch {
- // try local config first
- case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
- r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
- r, err := provider.PSProvide(p.cfg.PubSubs[i])
+ // we need here only Reader part of the interface
+ go func(ps pubsub.Reader) {
+ for {
+ select {
+ case <-p.serveExit:
+ return
+ default:
+ data, err := ps.Next()
if err != nil {
- return err
+ errCh <- err
+ return
}
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- default:
- return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+ p.workersPool.Queue(data)
}
- } else {
- // no such driver
- p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
}
- }
+ }(p.subReader)
- return nil
+ return errCh
}
func (p *Plugin) Stop() error {
@@ -208,30 +157,12 @@ func (p *Plugin) Stop() error {
return nil
}
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.GetPublishers,
- }
-}
-
func (p *Plugin) Available() {}
-func (p *Plugin) RPC() interface{} {
- return &rpc{
- plugin: p,
- log: p.log,
- }
-}
-
func (p *Plugin) Name() string {
return PluginName
}
-// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
- p.psProviders[name.Name()] = pub
-}
-
func (p *Plugin) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != p.cfg.Path {
@@ -277,7 +208,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Store(connectionID, safeConn)
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, p.subReader, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
err = e.StartCommandLoop()
@@ -361,55 +292,6 @@ func (p *Plugin) Reset() error {
return nil
}
-// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(m []byte) error {
- p.Lock()
- defer p.Unlock()
-
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- return err
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- return nil
-}
-
-func (p *Plugin) PublishAsync(m []byte) {
- go func() {
- p.Lock()
- defer p.Unlock()
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- p.log.Error("message unmarshal")
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- }()
-}
-
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
const op = errors.Op("access_validator")
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index a196d1f0..752ba3ce 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,7 +4,6 @@ import (
"sync"
json "github.com/json-iterator/go"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -12,21 +11,21 @@ import (
)
type WorkersPool struct {
- storage map[string]pubsub.PubSub
+ subscriber pubsub.Subscriber
connections *sync.Map
resPool sync.Pool
log logger.Logger
- queue chan *websocketsv1.Message
+ queue chan *pubsub.Message
exit chan struct{}
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *websocketsv1.Message, 100),
- storage: pubsubs,
+ queue: make(chan *pubsub.Message, 100),
+ subscriber: subscriber,
log: log,
exit: make(chan struct{}),
}
@@ -43,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
return wp
}
-func (wp *WorkersPool) Queue(msg *websocketsv1.Message) {
+func (wp *WorkersPool) Queue(msg *pubsub.Message) {
wp.queue <- msg
}
@@ -83,63 +82,48 @@ func (wp *WorkersPool) do() { //nolint:gocognit
return
}
_ = msg
- if msg == nil {
- continue
- }
- if len(msg.GetTopics()) == 0 {
+ if msg == nil || msg.Topic == "" {
continue
}
- br, ok := wp.storage[msg.Broker]
- if !ok {
- wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
+ // get free map
+ res := wp.get()
+
+ // get connections for the particular topic
+ wp.subscriber.Connections(msg.Topic, res)
+
+ if len(res) == 0 {
+ wp.log.Info("no connections associated with provided topic", "topic", msg.Topic)
+ wp.put(res)
continue
}
- // send a message to every topic
- for i := 0; i < len(msg.GetTopics()); i++ {
- // get free map
- res := wp.get()
+ // res is a map with a connectionsID
+ for connID := range res {
+ c, ok := wp.connections.Load(connID)
+ if !ok {
+ wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.Topic)
+ wp.put(res)
+ continue
+ }
- // get connections for the particular topic
- br.Connections(msg.GetTopics()[i], res)
+ d, err := json.Marshal(&Response{
+ Topic: msg.Topic,
+ Payload: utils.AsString(msg.Payload),
+ })
- if len(res) == 0 {
- wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
+ if err != nil {
+ wp.log.Error("error marshaling response", "error", err)
wp.put(res)
- continue
+ break
}
- // res is a map with a connectionsID
- for topic := range res {
- c, ok := wp.connections.Load(topic)
- if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
- wp.put(res)
- continue
- }
-
- response := &Response{
- Topic: msg.GetTopics()[i],
- Payload: utils.AsString(msg.GetPayload()),
- }
-
- d, err := json.Marshal(response)
- if err != nil {
- wp.log.Error("error marshaling response", "error", err)
- wp.put(res)
- break
- }
-
- // put data into the bytes buffer
- err = c.(*connection.Connection).Write(d)
- if err != nil {
- for i := 0; i < len(msg.GetTopics()); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
- }
- wp.put(res)
- continue
- }
+ // put data into the bytes buffer
+ err = c.(*connection.Connection).Write(d)
+ if err != nil {
+ wp.log.Error("error sending payload over the connection", "error", err, "topic", msg.Topic)
+ wp.put(res)
+ continue
}
}
case <-wp.exit:
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
deleted file mode 100644
index 341e0b2a..00000000
--- a/plugins/websockets/rpc.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package websockets
-
-import (
- "github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-)
-
-// rpc collectors struct
-type rpc struct {
- plugin *Plugin
- log logger.Logger
-}
-
-// Publish ... msg is a proto decoded payload
-// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("broadcast_publish")
-
- // just return in case of nil message
- if in == nil {
- out.Ok = false
- return nil
- }
-
- r.log.Debug("message published", "msg", in.Messages)
-
- msgLen := len(in.GetMessages())
-
- for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- return errors.E(op, err)
- }
-
- err = r.plugin.Publish(bb)
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
- }
-
- out.Ok = true
- return nil
-}
-
-// PublishAsync ...
-// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("publish_async")
-
- // just return in case of nil message
- if in == nil {
- out.Ok = false
- return nil
- }
-
- r.log.Debug("message published", "msg", in.Messages)
-
- msgLen := len(in.GetMessages())
-
- for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
-
- r.plugin.PublishAsync(bb)
- }
-
- out.Ok = true
- return nil
-}
diff --git a/pkg/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go
index 622967b8..622967b8 100644
--- a/pkg/proto/kv/v1beta/kv.pb.go
+++ b/proto/kv/v1beta/kv.pb.go
diff --git a/pkg/proto/kv/v1beta/kv.proto b/proto/kv/v1beta/kv.proto
index 1e3b8177..1e3b8177 100644
--- a/pkg/proto/kv/v1beta/kv.proto
+++ b/proto/kv/v1beta/kv.proto
diff --git a/pkg/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go
index d39b55da..ad4ebbe7 100644
--- a/pkg/proto/websockets/v1beta/websockets.pb.go
+++ b/proto/websockets/v1beta/websockets.pb.go
@@ -26,9 +26,8 @@ type Message struct {
unknownFields protoimpl.UnknownFields
Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"`
- Broker string `protobuf:"bytes,2,opt,name=broker,proto3" json:"broker,omitempty"`
- Topics []string `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"`
- Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"`
+ Topics []string `protobuf:"bytes,2,rep,name=topics,proto3" json:"topics,omitempty"`
+ Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"`
}
func (x *Message) Reset() {
@@ -70,13 +69,6 @@ func (x *Message) GetCommand() string {
return ""
}
-func (x *Message) GetBroker() string {
- if x != nil {
- return x.Broker
- }
- return ""
-}
-
func (x *Message) GetTopics() []string {
if x != nil {
return x.Topics
@@ -91,6 +83,7 @@ func (x *Message) GetPayload() []byte {
return nil
}
+// RPC request with messages
type Request struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -138,6 +131,7 @@ func (x *Request) GetMessages() []*Message {
return nil
}
+// RPC response (false in case of error)
type Response struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -190,22 +184,20 @@ var File_websockets_proto protoreflect.FileDescriptor
var file_websockets_proto_rawDesc = []byte{
0x0a, 0x10, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x11, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76,
- 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x55, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
- 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72,
- 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b,
- 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03,
- 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61,
- 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79,
- 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x41, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
- 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28,
- 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76,
- 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d,
- 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f,
- 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52,
- 0x02, 0x6f, 0x6b, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63,
- 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
- 0x6f, 0x33,
+ 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f,
+ 0x70, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69,
+ 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20,
+ 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x41, 0x0a, 0x07,
+ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73,
+ 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22,
+ 0x1a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f,
+ 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x42, 0x15, 0x5a, 0x13, 0x2e,
+ 0x2f, 0x3b, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65,
+ 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/pkg/proto/websockets/v1beta/websockets.proto b/proto/websockets/v1beta/websockets.proto
index ede3cde9..5be6f70f 100644
--- a/pkg/proto/websockets/v1beta/websockets.proto
+++ b/proto/websockets/v1beta/websockets.proto
@@ -5,9 +5,8 @@ option go_package = "./;websocketsv1beta";
message Message {
string command = 1;
- string broker = 2;
- repeated string topics = 3;
- bytes payload = 4;
+ repeated string topics = 2;
+ bytes payload = 3;
}
// RPC request with messages
diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml
index 67d5476b..b6ba0f66 100644
--- a/tests/docker-compose.yaml
+++ b/tests/docker-compose.yaml
@@ -9,3 +9,7 @@ services:
image: redis:6
ports:
- "6379:6379"
+ redis2:
+ image: redis:6
+ ports:
+ - "6378:6379"
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
new file mode 100644
index 00000000..2cd4b451
--- /dev/null
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -0,0 +1,474 @@
+package broadcast
+
+import (
+ "net"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/golang/mock/gomock"
+ endure "github.com/spiral/endure/pkg/container"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/websockets"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/tests/mocks"
+ "github.com/spiral/roadrunner/v2/tests/plugins/broadcast/plugins"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBroadcastInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-broadcast-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &broadcast.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+ )
+
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func TestBroadcastConfigError(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-broadcast-config-error.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &broadcast.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+
+ &plugins.Plugin1{},
+ )
+
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = cont.Serve()
+ assert.Error(t, err)
+}
+
+func TestBroadcastNoConfig(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-broadcast-no-config.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", []string{}).MinTimes(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ &broadcast.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+ )
+
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // should be just disabled
+ _, err = cont.Serve()
+ assert.NoError(t, err)
+}
+
+func TestBroadcastSameSubscriber(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-broadcast-same-section.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1)
+ mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin4: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin5: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin6: {foo hello}`).Times(3)
+
+ err = cont.RegisterAll(
+ cfg,
+ &broadcast.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+
+ // test - redis
+ // test2 - redis (port 6378)
+ // test3 - memory
+ // test4 - memory
+ &plugins.Plugin1{}, // foo, foo2, foo3 test
+ &plugins.Plugin2{}, // foo, test
+ &plugins.Plugin3{}, // foo, test2
+ &plugins.Plugin4{}, // foo, test3
+ &plugins.Plugin5{}, // foo, test4
+ &plugins.Plugin6{}, // foo, test3
+ )
+
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 2)
+
+ t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002"))
+ t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002"))
+ t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002"))
+ t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002"))
+
+ time.Sleep(time.Second * 4)
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func TestBroadcastSameSubscriberGlobal(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-broadcast-global.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1)
+ mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin4: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin5: {foo hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin6: {foo hello}`).Times(3)
+
+ err = cont.RegisterAll(
+ cfg,
+ &broadcast.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+
+ // test - redis
+ // test2 - redis (port 6378)
+ // test3 - memory
+ // test4 - memory
+ &plugins.Plugin1{}, // foo, foo2, foo3 test
+ &plugins.Plugin2{}, // foo, test
+ &plugins.Plugin3{}, // foo, test2
+ &plugins.Plugin4{}, // foo, test3
+ &plugins.Plugin5{}, // foo, test4
+ &plugins.Plugin6{}, // foo, test3
+ )
+
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 2)
+
+ t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003"))
+ t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003"))
+ t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003"))
+ t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003"))
+
+ time.Sleep(time.Second * 4)
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:"+port)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ ret := &websocketsv1.Response{}
+ err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+func BroadcastPublishFoo2(port string) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:"+port)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ ret := &websocketsv1.Response{}
+ err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo"), ret)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+func BroadcastPublishFoo3(port string) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:"+port)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ ret := &websocketsv1.Response{}
+ err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo3"), ret)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:"+port)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ ret := &websocketsv1.Response{}
+ err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+func makeMessage(payload []byte, topics ...string) *websocketsv1.Request {
+ m := &websocketsv1.Request{
+ Messages: []*websocketsv1.Message{
+ {
+ Topics: topics,
+ Payload: payload,
+ },
+ },
+ }
+
+ return m
+}
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
new file mode 100644
index 00000000..d8daa251
--- /dev/null
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
@@ -0,0 +1,33 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:21345
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+broadcast:
+ default:
+ driver: redis
+
+logs:
+ mode: development
+ level: debug
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
index eedf5377..2ca97055 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
@@ -1,5 +1,5 @@
rpc:
- listen: tcp://127.0.0.1:6001
+ listen: tcp://127.0.0.1:6003
server:
command: "php ../../psr-worker-bench.php"
@@ -9,7 +9,7 @@ server:
relay_timeout: "20s"
http:
- address: 127.0.0.1:13235
+ address: 127.0.0.1:21543
max_request_size: 1024
middleware: [ "websockets" ]
trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
@@ -23,11 +23,18 @@ redis:
addrs:
- "localhost:6379"
-websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
- # pubsubs might use general config section
- pubsubs: [ "redis", "memory" ]
- path: "/ws"
+broadcast:
+ test:
+ driver: redis
+ test2:
+ driver: redis
+ addrs:
+ - "localhost:6378"
+ test3:
+ driver: memory
+ test4:
+ driver: memory
+
logs:
mode: development
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
new file mode 100644
index 00000000..aa80330e
--- /dev/null
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
@@ -0,0 +1,35 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:21345
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+broadcast:
+ default:
+ driver: redis
+ addrs:
+ - "localhost:6379"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml
new file mode 100644
index 00000000..90790869
--- /dev/null
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml
@@ -0,0 +1,29 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:21345
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+logs:
+ mode: development
+ level: debug
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
new file mode 100644
index 00000000..360e05e5
--- /dev/null
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
@@ -0,0 +1,43 @@
+rpc:
+ listen: tcp://127.0.0.1:6002
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:21345
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+broadcast:
+ test:
+ driver: redis
+ addrs:
+ - "localhost:6379"
+ test2:
+ driver: redis
+ addrs:
+ - "localhost:6378"
+ test3:
+ driver: memory
+ test4:
+ driver: memory
+
+logs:
+ mode: development
+ level: debug
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go
new file mode 100644
index 00000000..d3b16256
--- /dev/null
+++ b/tests/plugins/broadcast/plugins/plugin1.go
@@ -0,0 +1,67 @@
+package plugins
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const Plugin1Name = "plugin1"
+
+type Plugin1 struct {
+ log logger.Logger
+ b broadcast.Broadcaster
+ driver pubsub.SubReader
+}
+
+func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error {
+ p.log = log
+ p.b = b
+ return nil
+}
+
+func (p *Plugin1) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ var err error
+ p.driver, err = p.b.GetDriver("test")
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ err = p.driver.Subscribe("1", "foo", "foo2", "foo3")
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ for {
+ msg, err := p.driver.Next()
+ if err != nil {
+ panic(err)
+ }
+
+ if msg == nil {
+ continue
+ }
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin1) Stop() error {
+ _ = p.driver.Unsubscribe("1", "foo")
+ _ = p.driver.Unsubscribe("1", "foo2")
+ _ = p.driver.Unsubscribe("1", "foo3")
+ return nil
+}
+
+func (p *Plugin1) Name() string {
+ return Plugin1Name
+}
diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go
new file mode 100644
index 00000000..2bd819d2
--- /dev/null
+++ b/tests/plugins/broadcast/plugins/plugin2.go
@@ -0,0 +1,64 @@
+package plugins
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const Plugin2Name = "plugin2"
+
+type Plugin2 struct {
+ log logger.Logger
+ b broadcast.Broadcaster
+ driver pubsub.SubReader
+}
+
+func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error {
+ p.log = log
+ p.b = b
+ return nil
+}
+
+func (p *Plugin2) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ var err error
+ p.driver, err = p.b.GetDriver("test")
+ if err != nil {
+ panic(err)
+ }
+
+ err = p.driver.Subscribe("2", "foo")
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ for {
+ msg, err := p.driver.Next()
+ if err != nil {
+ panic(err)
+ }
+
+ if msg == nil {
+ continue
+ }
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin2) Stop() error {
+ _ = p.driver.Unsubscribe("2", "foo")
+ return nil
+}
+
+func (p *Plugin2) Name() string {
+ return Plugin2Name
+}
diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go
new file mode 100644
index 00000000..ef926222
--- /dev/null
+++ b/tests/plugins/broadcast/plugins/plugin3.go
@@ -0,0 +1,64 @@
+package plugins
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const Plugin3Name = "plugin3"
+
+type Plugin3 struct {
+ log logger.Logger
+ b broadcast.Broadcaster
+ driver pubsub.SubReader
+}
+
+func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error {
+ p.log = log
+ p.b = b
+ return nil
+}
+
+func (p *Plugin3) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ var err error
+ p.driver, err = p.b.GetDriver("test2")
+ if err != nil {
+ panic(err)
+ }
+
+ err = p.driver.Subscribe("3", "foo")
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ for {
+ msg, err := p.driver.Next()
+ if err != nil {
+ panic(err)
+ }
+
+ if msg == nil {
+ continue
+ }
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin3) Stop() error {
+ _ = p.driver.Unsubscribe("3", "foo")
+ return nil
+}
+
+func (p *Plugin3) Name() string {
+ return Plugin3Name
+}
diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go
new file mode 100644
index 00000000..c9b94777
--- /dev/null
+++ b/tests/plugins/broadcast/plugins/plugin4.go
@@ -0,0 +1,64 @@
+package plugins
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const Plugin4Name = "plugin4"
+
+type Plugin4 struct {
+ log logger.Logger
+ b broadcast.Broadcaster
+ driver pubsub.SubReader
+}
+
+func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error {
+ p.log = log
+ p.b = b
+ return nil
+}
+
+func (p *Plugin4) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ var err error
+ p.driver, err = p.b.GetDriver("test3")
+ if err != nil {
+ panic(err)
+ }
+
+ err = p.driver.Subscribe("4", "foo")
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ for {
+ msg, err := p.driver.Next()
+ if err != nil {
+ panic(err)
+ }
+
+ if msg == nil {
+ continue
+ }
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin4) Stop() error {
+ _ = p.driver.Unsubscribe("4", "foo")
+ return nil
+}
+
+func (p *Plugin4) Name() string {
+ return Plugin4Name
+}
diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go
new file mode 100644
index 00000000..01562a8f
--- /dev/null
+++ b/tests/plugins/broadcast/plugins/plugin5.go
@@ -0,0 +1,64 @@
+package plugins
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const Plugin5Name = "plugin5"
+
+type Plugin5 struct {
+ log logger.Logger
+ b broadcast.Broadcaster
+ driver pubsub.SubReader
+}
+
+func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error {
+ p.log = log
+ p.b = b
+ return nil
+}
+
+func (p *Plugin5) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ var err error
+ p.driver, err = p.b.GetDriver("test4")
+ if err != nil {
+ panic(err)
+ }
+
+ err = p.driver.Subscribe("5", "foo")
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ for {
+ msg, err := p.driver.Next()
+ if err != nil {
+ panic(err)
+ }
+
+ if msg == nil {
+ continue
+ }
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin5) Stop() error {
+ _ = p.driver.Unsubscribe("5", "foo")
+ return nil
+}
+
+func (p *Plugin5) Name() string {
+ return Plugin5Name
+}
diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go
new file mode 100644
index 00000000..76f2d6e8
--- /dev/null
+++ b/tests/plugins/broadcast/plugins/plugin6.go
@@ -0,0 +1,64 @@
+package plugins
+
+import (
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const Plugin6Name = "plugin6"
+
+type Plugin6 struct {
+ log logger.Logger
+ b broadcast.Broadcaster
+ driver pubsub.SubReader
+}
+
+func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error {
+ p.log = log
+ p.b = b
+ return nil
+}
+
+func (p *Plugin6) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ var err error
+ p.driver, err = p.b.GetDriver("test")
+ if err != nil {
+ panic(err)
+ }
+
+ err = p.driver.Subscribe("6", "foo")
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ for {
+ msg, err := p.driver.Next()
+ if err != nil {
+ panic(err)
+ }
+
+ if msg == nil {
+ continue
+ }
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin6) Stop() error {
+ _ = p.driver.Unsubscribe("6", "foo")
+ return nil
+}
+
+func (p *Plugin6) Name() string {
+ return Plugin6Name
+}
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index 24b66ae1..1e466e06 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -12,7 +12,6 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- payload "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb"
@@ -21,6 +20,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/redis"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ payload "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/stretchr/testify/assert"
)
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-allow.yaml
index 896cee05..e6c43857 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-allow.yaml
@@ -9,7 +9,7 @@ server:
relay_timeout: "20s"
http:
- address: 127.0.0.1:11113
+ address: 127.0.0.1:41278
max_request_size: 1024
middleware: [ "websockets" ]
trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
@@ -23,8 +23,13 @@ redis:
addrs:
- "localhost:6379"
+broadcast:
+ test:
+ driver: memory
+
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml b/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml
new file mode 100644
index 00000000..d537a80b
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-allow2.yaml
@@ -0,0 +1,44 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../worker-ok.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:41270
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+broadcast:
+ test:
+ driver: redis
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ broker: test
+ allowed_origin: "*"
+ path: "/ws"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml b/tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml
index fd125794..ada23845 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-broker-no-section.yaml
@@ -19,9 +19,13 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
+broadcast:
+ test1:
+ driver: no
websockets:
- pubsubs: [ "redis", "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-deny.yaml
index e3bf5218..594a746d 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-deny.yaml
@@ -9,7 +9,7 @@ server:
relay_timeout: "20s"
http:
- address: 127.0.0.1:11112
+ address: 127.0.0.1:15587
max_request_size: 1024
middleware: [ "websockets" ]
trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
@@ -19,12 +19,13 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
-redis:
- addrs:
- - "localhost:6379"
+broadcast:
+ test:
+ driver: memory
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml b/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml
new file mode 100644
index 00000000..4deea30a
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-deny2.yaml
@@ -0,0 +1,40 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../worker-deny.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:15588
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+broadcast:
+ test:
+ driver: redis
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ broker: test
+ allowed_origin: "*"
+ path: "/ws"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
index dc073be3..14472f8a 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
@@ -23,12 +23,16 @@ redis:
addrs:
- "localhost:6379"
+broadcast:
+ default:
+ driver: memory
+
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
- # pubsubs might use general config section or its own
- pubsubs: [ "redis" ]
+ broker: default
+ allowed_origin: "*"
path: "/ws"
+
logs:
mode: development
level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml
index 27eab557..3557f5f1 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml
@@ -19,13 +19,17 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
-
-websockets:
- pubsubs: [ "redis", "memory" ]
- redis:
+redis:
addrs:
- "localhost:6379"
+broadcast:
+ test:
+ driver: redis
+
+websockets:
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-stop.yaml
index 0614f4e7..5377aef2 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-stop.yaml
@@ -19,12 +19,13 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
-redis:
- addrs:
- - "localhost:6379"
+broadcast:
+ test:
+ driver: memory
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 8321297d..29bf28be 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -16,7 +16,7 @@ import (
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -25,6 +25,7 @@ import (
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/utils"
"github.com/stretchr/testify/assert"
)
@@ -47,6 +48,7 @@ func TestBroadcastInit(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -98,52 +100,20 @@ func TestBroadcastInit(t *testing.T) {
time.Sleep(time.Second * 1)
t.Run("TestWSInit", wsInit)
+ t.Run("RPCWsMemoryPubAsync", RPCWsPubAsync("11111"))
+ t.Run("RPCWsMemory", RPCWsPub("11111"))
stopCh <- struct{}{}
wg.Wait()
}
-func wsInit(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
-
- defer func() {
- _ = resp.Body.Close()
- }()
-
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
-
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
-}
-
-func TestWSRedisAndMemory(t *testing.T) {
+func TestWSRedis(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-websockets-redis-memory.yaml",
+ Path: "configs/.rr-websockets-redis.yaml",
Prefix: "rr",
}
@@ -155,7 +125,7 @@ func TestWSRedisAndMemory(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -205,21 +175,20 @@ func TestWSRedisAndMemory(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
- t.Run("RPCWsMemory", RPCWsMemory)
- t.Run("RPCWsRedis", RPCWsRedis)
+ t.Run("RPCWsRedisPubAsync", RPCWsPubAsync("13235"))
+ t.Run("RPCWsRedisPub", RPCWsPub("13235"))
stopCh <- struct{}{}
wg.Wait()
}
-func TestWSRedisAndMemoryGlobal(t *testing.T) {
+func TestWSRedisNoSection(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-websockets-redis-memory.yaml",
+ Path: "configs/.rr-websockets-broker-no-section.yaml",
Prefix: "rr",
}
@@ -231,7 +200,37 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
+ &broadcast.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = cont.Serve()
+ assert.Error(t, err)
+}
+
+func TestWSDeny(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-deny.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -281,21 +280,19 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
- t.Run("RPCWsMemory", RPCWsMemory)
- t.Run("RPCWsRedis", RPCWsRedis)
+ t.Run("RPCWsMemoryDeny", RPCWsDeny("15587"))
stopCh <- struct{}{}
wg.Wait()
}
-func TestWSRedisNoSection(t *testing.T) {
+func TestWSDeny2(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-websockets-redis-no-section.yaml",
+ Path: "configs/.rr-websockets-deny2.yaml",
Prefix: "rr",
}
@@ -304,10 +301,10 @@ func TestWSRedisNoSection(t *testing.T) {
&rpcPlugin.Plugin{},
&logger.ZapLogger{},
&server.Plugin{},
- &redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
+ &redis.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -316,234 +313,60 @@ func TestWSRedisNoSection(t *testing.T) {
t.Fatal(err)
}
- _, err = cont.Serve()
- assert.Error(t, err)
-}
-
-func RPCWsMemoryPubAsync(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
-
- defer func() {
- _ = resp.Body.Close()
- }()
-
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
-
- publishAsync(t, "", "memory", "foo")
-
- // VERIFY a makeMessage
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
-
- // //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ ch, err := cont.Serve()
if err != nil {
- panic(err)
+ t.Fatal(err)
}
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
- // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
- publishAsync(t, "", "memory", "foo")
+ stopCh := make(chan struct{}, 1)
go func() {
- time.Sleep(time.Second * 5)
- publishAsync2(t, "", "memory", "foo2")
- }()
-
- // should be only makeMessage from the subscribed foo2 topic
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
-
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
-}
-
-func RPCWsMemory(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
-
- defer func() {
- if resp != nil && resp.Body != nil {
- _ = resp.Body.Close()
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
}
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
-
- publish("", "memory", "foo")
-
- // VERIFY a makeMessage
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
-
- // //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
-
- // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
- publish("", "memory", "foo")
-
- go func() {
- time.Sleep(time.Second * 5)
- publish2(t, "", "memory", "foo2")
- }()
-
- // should be only makeMessage from the subscribed foo2 topic
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
-
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
-}
-
-func RPCWsRedis(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
-
- defer func() {
- _ = resp.Body.Close()
- }()
-
- d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
-
- publish("", "redis", "foo")
-
- // VERIFY a makeMessage
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
-
- // //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
-
- // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
- publish("", "redis", "foo")
-
- go func() {
- time.Sleep(time.Second * 5)
- publish2(t, "", "redis", "foo2")
- }()
+ time.Sleep(time.Second * 1)
+ t.Run("RPCWsRedisDeny", RPCWsDeny("15588"))
- // should be only makeMessage from the subscribed foo2 topic
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
+ stopCh <- struct{}{}
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
+ wg.Wait()
}
-func TestWSMemoryDeny(t *testing.T) {
+func TestWSStop(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-websockets-memory-deny.yaml",
+ Path: "configs/.rr-websockets-stop.yaml",
Prefix: "rr",
}
@@ -556,6 +379,7 @@ func TestWSMemoryDeny(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -605,73 +429,40 @@ func TestWSMemoryDeny(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryDeny", RPCWsMemoryDeny)
+ t.Run("RPCWsStop", RPCWsMemoryStop("11114"))
stopCh <- struct{}{}
wg.Wait()
}
-func RPCWsMemoryDeny(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "localhost:11112", Path: "/ws"}
+func RPCWsMemoryStop(port string) func(t *testing.T) {
+ return func(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NoError(t, err)
- assert.NotNil(t, c)
- assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
+ connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"}
- defer func() {
- if resp != nil && resp.Body != nil {
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NotNil(t, resp)
+ assert.Error(t, err)
+ assert.Nil(t, c)
+ assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) //nolint:staticcheck
+ assert.Equal(t, resp.Header.Get("Stop"), "we-dont-like-you") //nolint:staticcheck
+ if resp != nil && resp.Body != nil { //nolint:staticcheck
_ = resp.Body.Close()
}
- }()
-
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
- if err != nil {
- panic(err)
}
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err := c.ReadMessage()
- retMsg := utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
-
- // //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
- if err != nil {
- panic(err)
- }
-
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
-
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
-
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
-
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
}
-func TestWSMemoryStop(t *testing.T) {
+func TestWSAllow(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-websockets-memory-stop.yaml",
+ Path: "configs/.rr-websockets-allow.yaml",
Prefix: "rr",
}
@@ -684,6 +475,7 @@ func TestWSMemoryStop(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -733,38 +525,19 @@ func TestWSMemoryStop(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryStop", RPCWsMemoryStop)
+ t.Run("RPCWsMemoryAllow", RPCWsPub("41278"))
stopCh <- struct{}{}
wg.Wait()
}
-func RPCWsMemoryStop(t *testing.T) {
- da := websocket.Dialer{
- Proxy: http.ProxyFromEnvironment,
- HandshakeTimeout: time.Second * 20,
- }
-
- connURL := url.URL{Scheme: "ws", Host: "localhost:11114", Path: "/ws"}
-
- c, resp, err := da.Dial(connURL.String(), nil)
- assert.NotNil(t, resp)
- assert.Error(t, err)
- assert.Nil(t, c)
- assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) //nolint:staticcheck
- assert.Equal(t, resp.Header.Get("Stop"), "we-dont-like-you") //nolint:staticcheck
- if resp != nil && resp.Body != nil { //nolint:staticcheck
- _ = resp.Body.Close()
- }
-}
-
-func TestWSMemoryOk(t *testing.T) {
+func TestWSAllow2(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-websockets-memory-allow.yaml",
+ Path: "configs/.rr-websockets-allow2.yaml",
Prefix: "rr",
}
@@ -777,6 +550,7 @@ func TestWSMemoryOk(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -826,33 +600,29 @@ func TestWSMemoryOk(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryAllow", RPCWsMemoryAllow)
+ t.Run("RPCWsMemoryAllow", RPCWsPub("41270"))
stopCh <- struct{}{}
wg.Wait()
}
-func RPCWsMemoryAllow(t *testing.T) {
+func wsInit(t *testing.T) {
da := websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: time.Second * 20,
}
- connURL := url.URL{Scheme: "ws", Host: "localhost:11113", Path: "/ws"}
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
c, resp, err := da.Dial(connURL.String(), nil)
assert.NoError(t, err)
- assert.NotNil(t, c)
- assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
defer func() {
- if resp != nil && resp.Body != nil {
- _ = resp.Body.Close()
- }
+ _ = resp.Body.Close()
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -867,64 +637,219 @@ func RPCWsMemoryAllow(t *testing.T) {
// subscription done
assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
- publish("", "memory", "foo")
-
- // VERIFY a makeMessage
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
+}
- // //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
- if err != nil {
- panic(err)
- }
+func RPCWsPubAsync(port string) func(t *testing.T) {
+ return func(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 18,
+ }
- err = c.WriteMessage(websocket.BinaryMessage, d)
- assert.NoError(t, err)
+ connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"}
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
- // subscription done
- assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+ defer func() {
+ _ = resp.Body.Close()
+ }()
- // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
- publish("", "memory", "foo")
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
- go func() {
- time.Sleep(time.Second * 5)
- publish2(t, "", "memory", "foo2")
- }()
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
- // should be only makeMessage from the subscribed foo2 topic
- _, msg, err = c.ReadMessage()
- retMsg = utils.AsString(msg)
- assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
- err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
- assert.NoError(t, err)
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ publishAsync(t, "placeholder", "foo")
+
+ // VERIFY a makeMessage
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
+
+ // //// LEAVE foo /////////
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
+ publishAsync(t, "placeholder", "foo")
+
+ go func() {
+ time.Sleep(time.Second * 3)
+ publishAsync(t, "placeholder", "foo2")
+ }()
+
+ // should be only makeMessage from the subscribed foo0 topic
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+ }
}
-func publish(command string, broker string, topics ...string) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
- if err != nil {
- panic(err)
+func RPCWsPub(port string) func(t *testing.T) {
+ return func(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+
+ defer func() {
+ if resp != nil && resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ }()
+
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ publish("", "foo")
+
+ // VERIFY a makeMessage
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
+
+ // //// LEAVE foo, foo2 /////////
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ // TRY TO PUBLISH TO UNSUBSCRIBED TOPIC
+ publish("", "foo")
+
+ go func() {
+ time.Sleep(time.Second * 5)
+ publish2(t, "", "foo2")
+ }()
+
+ // should be only makeMessage from the subscribed foo2 topic
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+ assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
}
+}
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+func RPCWsDeny(port string) func(t *testing.T) {
+ return func(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
- ret := &websocketsv1.Response{}
- err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
- if err != nil {
- panic(err)
+ connURL := url.URL{Scheme: "ws", Host: "localhost:" + port, Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
+
+ defer func() {
+ if resp != nil && resp.Body != nil {
+ _ = resp.Body.Close()
+ }
+ }()
+
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
+
+ // //// LEAVE foo, foo2 /////////
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err = c.ReadMessage()
+ retMsg = utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@leave","payload":["foo"]}`, retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
}
}
-func publishAsync(t *testing.T, command string, broker string, topics ...string) {
+// ---------------------------------------------------------------------------------------------------
+
+func publish(command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -933,12 +858,13 @@ func publishAsync(t *testing.T, command string, broker string, topics ...string)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
- assert.NoError(t, err)
- assert.True(t, ret.Ok)
+ err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret)
+ if err != nil {
+ panic(err)
+ }
}
-func publishAsync2(t *testing.T, command string, broker string, topics ...string) {
+func publishAsync(t *testing.T, command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -947,12 +873,12 @@ func publishAsync2(t *testing.T, command string, broker string, topics ...string
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
+ err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func publish2(t *testing.T, command string, broker string, topics ...string) {
+func publish2(t *testing.T, command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -961,27 +887,25 @@ func publish2(t *testing.T, command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message {
+func messageWS(command string, payload []byte, topics ...string) *websocketsv1.Message {
return &websocketsv1.Message{
Topics: topics,
Command: command,
- Broker: broker,
Payload: payload,
}
}
-func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request {
+func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request {
m := &websocketsv1.Request{
Messages: []*websocketsv1.Message{
{
Topics: topics,
Command: command,
- Broker: broker,
Payload: payload,
},
},