summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-18 01:06:16 +0300
committerValery Piashchynski <[email protected]>2021-06-18 01:06:16 +0300
commitfe7bb0fe758d573fe353df028257ed66c6eccf66 (patch)
tree74392f8e61e96c85f0d8b684cfc08e3fc3664ae9
parent68ff941c4226074206ceed9c30bd95317aa0e9fc (diff)
- Rework main parts
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/interface/broadcast/broadcast.go4
-rw-r--r--pkg/interface/pubsub/interface.go (renamed from pkg/pubsub/interface.go)0
-rw-r--r--pkg/proto/websockets/v1beta/websockets.pb.go42
-rw-r--r--pkg/proto/websockets/v1beta/websockets.proto5
-rw-r--r--plugins/broadcast/config.go6
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio2
-rw-r--r--plugins/broadcast/plugin.go135
-rw-r--r--plugins/kv/plugin.go5
-rw-r--r--plugins/memory/plugin.go2
-rw-r--r--plugins/memory/pubsub.go2
-rw-r--r--plugins/redis/plugin.go2
-rw-r--r--plugins/redis/pubsub.go7
-rw-r--r--plugins/websockets/config.go16
-rw-r--r--plugins/websockets/executor/executor.go41
-rw-r--r--plugins/websockets/origin_test.go9
-rw-r--r--plugins/websockets/plugin.go140
-rw-r--r--plugins/websockets/pool/workers_pool.go20
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go7
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-init.yaml14
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml12
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml7
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml9
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml9
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml13
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml3
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-redis.yaml (renamed from tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml)9
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go65
27 files changed, 328 insertions, 258 deletions
diff --git a/pkg/interface/broadcast/broadcast.go b/pkg/interface/broadcast/broadcast.go
index c922c82e..4c49f7c5 100644
--- a/pkg/interface/broadcast/broadcast.go
+++ b/pkg/interface/broadcast/broadcast.go
@@ -1,7 +1,7 @@
package broadcast
-import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+import "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
type Broadcaster interface {
- GetDriver(key string) pubsub.SubReader
+ GetDriver(key string) (pubsub.SubReader, error)
}
diff --git a/pkg/pubsub/interface.go b/pkg/interface/pubsub/interface.go
index 30b544db..30b544db 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/interface/pubsub/interface.go
diff --git a/pkg/proto/websockets/v1beta/websockets.pb.go b/pkg/proto/websockets/v1beta/websockets.pb.go
index d39b55da..ad4ebbe7 100644
--- a/pkg/proto/websockets/v1beta/websockets.pb.go
+++ b/pkg/proto/websockets/v1beta/websockets.pb.go
@@ -26,9 +26,8 @@ type Message struct {
unknownFields protoimpl.UnknownFields
Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"`
- Broker string `protobuf:"bytes,2,opt,name=broker,proto3" json:"broker,omitempty"`
- Topics []string `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"`
- Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"`
+ Topics []string `protobuf:"bytes,2,rep,name=topics,proto3" json:"topics,omitempty"`
+ Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"`
}
func (x *Message) Reset() {
@@ -70,13 +69,6 @@ func (x *Message) GetCommand() string {
return ""
}
-func (x *Message) GetBroker() string {
- if x != nil {
- return x.Broker
- }
- return ""
-}
-
func (x *Message) GetTopics() []string {
if x != nil {
return x.Topics
@@ -91,6 +83,7 @@ func (x *Message) GetPayload() []byte {
return nil
}
+// RPC request with messages
type Request struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -138,6 +131,7 @@ func (x *Request) GetMessages() []*Message {
return nil
}
+// RPC response (false in case of error)
type Response struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -190,22 +184,20 @@ var File_websockets_proto protoreflect.FileDescriptor
var file_websockets_proto_rawDesc = []byte{
0x0a, 0x10, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x11, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76,
- 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x55, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
- 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72,
- 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b,
- 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03,
- 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61,
- 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79,
- 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x41, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
- 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28,
- 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76,
- 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d,
- 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f,
- 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52,
- 0x02, 0x6f, 0x6b, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63,
- 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
- 0x6f, 0x33,
+ 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f,
+ 0x70, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69,
+ 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20,
+ 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x41, 0x0a, 0x07,
+ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73,
+ 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22,
+ 0x1a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f,
+ 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x42, 0x15, 0x5a, 0x13, 0x2e,
+ 0x2f, 0x3b, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65,
+ 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/pkg/proto/websockets/v1beta/websockets.proto b/pkg/proto/websockets/v1beta/websockets.proto
index ede3cde9..5be6f70f 100644
--- a/pkg/proto/websockets/v1beta/websockets.proto
+++ b/pkg/proto/websockets/v1beta/websockets.proto
@@ -5,9 +5,8 @@ option go_package = "./;websocketsv1beta";
message Message {
string command = 1;
- string broker = 2;
- repeated string topics = 3;
- bytes payload = 4;
+ repeated string topics = 2;
+ bytes payload = 3;
}
// RPC request with messages
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
index 18846f30..4f1e5213 100644
--- a/plugins/broadcast/config.go
+++ b/plugins/broadcast/config.go
@@ -1,6 +1,9 @@
package broadcast
/*
+
+# Global redis config (priority - 2)
+
websockets: # <----- one of possible subscribers
path: /ws
broker: default # <------ broadcast broker to use --------------- |
@@ -8,9 +11,12 @@ websockets: # <----- one of possible subscribers
broadcast: # <-------- broadcast entry point plugin |
default: # <----------------------------------------------------- |
driver: redis
+ # local redis config (priority - 1)
test:
driver: memory
+
+priority local -> global
*/
// Config ...
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
index b8d2947e..b2ee091a 100644
--- a/plugins/broadcast/doc/broadcast_arch.drawio
+++ b/plugins/broadcast/doc/broadcast_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-06-17T16:23:35.917Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="1VvfJYAxL9mW7TkXHKVj" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5s4F/41nml3xhkkcf2YW7ud2U6z8buz7acdbGSbLUZewLnsr38lEBgkOcHhojibtHHgIMlwdK6PLkzQ5ebhc+Jv119JgKMJNIKHCbqaQAhNaNM/jPJYUBDyYEFZJWFQ0MCeMAv/xZxocOouDHDaKJgREmXhtklckDjGi6xB85OE3DeLLUnU/Natv8ISYbbwI5n6Zxhka/5ghmHsL/yKw9U6E69s/LI0J6RrPyD3NRK6nqDLhJCsONo8XOKIsa9kTFHv04Gr1Z0lOM7aVLg0bfvLxed46nxH01l8t/5OPk9ts2jmzo92/JH53WaPJQ8SsosDzFoxJujifh1meLb1F+zqPe12Sltnm4ieAXq4DKPokkQkyeuiwMfuckHpaZaQn7h2xV64eL6kV/gN4CTDDwcfDVQMo7KGyQZnySMtwisgl/OYi5lp8fP7Rp8VtHW9uzxO9LmcrKq295ykB5yZRzDWsodl7NJi/5SMzX9YDRJnNXrx0w/DgQGaHHccmeOmq+A4sq2hOG69bY7DJsenlUBrZPnA1kM3ywUhR4alm+OexF8cUIfFT0mSrcmKxH50vadeNHtgX+Y3Qrac73/jLHvk3tffZaTZK/ghzL6z6mcWP/vBG2PHVw/1k8fyJKaPW6vETn+U7bGTfbX8rKxXPB97qKd7jfKA7JIFfoJXZbzgJyucPVXOUItBgiM/C++aN6LqUV71hoT0FivxcaBxBkzoWg7/bMiSZQiep7hN3oYgJdVNvVxwSonUJTmwLjqgpejUBacmRwdERzQe7gIvlKHH3LVYhKBF2IqAq6uwnSeJ/1grsGVikx6WRdu1GtJnm0KsKJYH3cq74OnyyDA6lbc8S1CSgiO9qgyQnNvF7bfzq8vz2f/69XJtBbWTN7MtgYWKiA2YCmdmDRUimzotEqiZo71fO84gwWMt0nIJ1R0d2HPbsvu1SLCt++tokDrJAJR07Pb66suMkm7+uJj9cdGvoi2xfYD/jjc3elI0UwjUqwixrmhwTEXT6vr7ULRXrWfoFPQMSXr29frrt9sfb0nRlB5tVEVzTl3RnFetafYpaJqM+51//f1mgj6x/29I22xbt7a5Eqfpo0dsFIB+WV7dv/PDyJ9TJtMn3c3T3ZweBAmVgSSVuoCyJmvy2Y/CVUyPF5RFmLL0gjEwXPjROb+wCYOgUGKchv/mX1R0IM/HaLvWxcS6Ym1RvU0LFQZSV8UkxsMg4ZYp95IaCR+qlwDSaRRPDbIy2po4rTauvM16apwQP1j4KVO+kKnLklmvPs1cYGE3MFVmzoVzZNv9KFBl1o6N3s3BFMh+V6D2CtQa9LU6KtCLcDhHcKLl+OMh3Mtz3E7l7WfKA+AZT1UYBlizoF6JdtqLtA7RNLva9peJpoMakgDcI0VHqDCQ6GiNJpopltM2x3r91hB6OkTOFSTI5LHiIZETywNgHSujQo2BBg7kkYMJtCOWXWwbsmv/s2Ozfy42tH9CmlCc06vG9oF+5nJgFPRpxsSUXTNr11i6MuUZCrvGk5SqTXq04n/9DYusIvmsFqhxcrNKeZYXnie1JgtKSaiCv/wG+Bd9movlKW0r0tZSq/2xiF+L8DJ7nn95KYl7n3F2leeLH37ix/zGkjBefaRHs938FvtB44lrTycYqXTtb9lhkk9Sq9sWQhtfRvnEMBbwHgiLX4q7dIuFhVE9pEj5q5l69VjYGSwWfp//cIT1t9pa/2EmQEjW1xKhoIHnPAB5RtifeJ6SxU+cyfhPl9x0DoJgaaj0ERgO8noCdwBAIuLtnMlTkpTpKRpqRhJwJCZ/icPsA4PaKr/wUeL2m0DbpEl5NlSgBePCbTIqeosXmGkxNOatQJo32jUKJBTYY3ZNZbhPKXep+64W40M9uq+yH553X3AY9wVFLB0ZZwxg9yz+2cqZHZsUgSp6EiD8g0mOXMEVZHWAHAdqnb1zYpFYe1HuBZVURmJnwECm6ULHcm3YhHpMcRXEwHFZyY6aj/p2Hxf+6SdLqIyUpkkhiXM3NclX9yzD1ZuI2IDCC406nAD1jsc1vBBs6YVA0ws9O++uT9WFbVW3axLVrVehpFIlLhGEdyUscb/PfhjmUeEWtSKKWhUcFM/TbQGecEXNgZMAL/1dlHVpzo8ico+Dv0gSViDPHov5pQHLvPhLtvmCvGbTtJ379Pnmu9idMnYuUChNZggJK1lMT5E2qib/DobkwPdRzSOMUGskxxkofnC9s/1CFhM6DWlCY8cPMq5TIQ1ne8S2MlLcQjHUdipi23Uc9/TCC0OY1G8iObyAyhVqg+m1DAfNqDgwYcIPeEE1kjL6MhdJP6B/NjhN/RWmRtiImBafYCeIMR5SIQ3jxnjuu3Ftb1y9lsYV9bJ0q8UgZbXudCx76kk6+y1b54nYTbRb5SmYjF29/rlckmbSFFg/YI7k7Pe/C5gDw1MYy1ERcySP1r8j5oX6qMCKUSFzBHU6spODzJHZ1pXZw7gyETL3PC2QOSh34WmNmdMaI4Dm6H0u/BDC7A4Wlz0BmnsjT2Yo2aETNA+MBcZQFbZZtgWcoRIqr+3CvsESKqR394yGHzoB0By1XanXOaXq1quqPbokKLlIhfrDyzOcZrBDY4dmY44HWo9kBkTQ2rN0g9ZI6wLfU/Pfblsj0Mvk82NBa3ds/y1PzHptoPVIei2C1q4CAhgXtEYyADbDyV3P6xf1BE+uYpvAcYOnsuF3q9nCapqtV4kNlMKLaLQoFgObSVNG45pYtIwIddo7wQ+8QLnDpI9NgOBASmnrh6FNKDH6PwRDi/CP9nnbprxDzzsK3YjwtYHQ5ivaqO4EQGizbfJvDoTbiSA0bGLQwDVbebTOKDQ6EoPmru9geWkSV6P8MIi1qXnb2NOK3dpKvjXcgrsnEOupaCGHDuVkeG10xHrs+E5fkqUXmjo1hNpsC051TrO69aoMGh1CqIvs6LJQoX7RanBMY9rR6ZFUXvTG1WrjZ9HpwTK698Xx7Q1AiUE+76t72Y3nWHR6CkZ21iU/XjE8PZJii/D0VLED3LjwtCVjYv3D05qQsCnQPrhvv/lZOVrfjGE5LS2t1i0tLXndwu3NZRNvOxTbpFs/LmkVVHd2s5tHYbr+wFc4fKxFQvUKvSoxBoGFHVXPeraD/L72BULNfYEc1RCTM6oKa91a4SgVpic3OAnpczM/+mrCobb5kNV1hVk3LZXzoV2qCDPK7bZ2m+h8wZYc7aHv3/w5jm5IGubIA7qakywjm4mMjedbijV26tplURhTpSpfNWn0o06OIbw8R/H2QqTQJnswbZKBmxmOg0m1WosJCmHfTH+5mVN0wpsYj0ACSOuo9gNW2brBhiNs2VPdJOQuDFi3NHbiKzqJNIZP3+i+57bwgijYdqu6F3QTPd2/tbXIv/Zvv0XX/wc=</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-06-17T18:52:07.846Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="eUG-4GjLSDkFvWQsKkB7" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5u4Gv41nml3xhkkcf0YO222M5vTnHg7bT+dwUa22WLLCziX/fVHAoFBkmMcrsmmlwSEJOC9v48ujNB083gdurv1DfFwMIKa9zhCVyMIoWZC+ouVPKUlCAIjLVmFvpeWgUPBzP8H80KNl+59D0elijEhQezvyoULst3iRVwqc8OQPJSrLUlQvuvOXWGpYLZwA7n0u+/F6+zFNO1w4Xfsr9axeGXjZrV5QbR2PfJQKEKfRmgaEhKnR5vHKQ4Y+TLCpO0+H7maP1mIt3GVBlPdNL9Mrrdj6wcaz7b36x/kemzqaTf3brDnr8yfNn7KaBCS/dbDrBdthCYPaz/Gs527YFcfKNtp2TreBPQM0MOlHwRTEpAwaYs8F9vLBS2P4pD8woUr5sLG8yW9Ir8Hf7V7HMb4sVDE3+sakw2OwydahV9FNqcxFzPd4OcPJZ6lZesiuxxe6HI5WeV9HyhJDzgxzyCsYbZL2KXB/ioJm/xhLcg2LpSnf5ohONBAmeKWJVNctxUUR6bRFsWNt01xWKb4OBfoHknesvXom+SCkCPN6JvijkRf7FGHxU9JGK/Jimzd4NOhdFLmwKHOH4TsON3/wnH8xL2vu49JmSv40Y9/sOYXBj/7yTtjx1ePxZOn7GRLX7fQiJ3+zPpjJ4dmyVnW7ijfIrIPF/gZ0mThgRuucPxcPc4wRrhnxSDEgRv79+VIQMVR3vSW+PSZc/GxoHYBdGgbFv9ZkiVDEzxP+ty8D0FK8od6ueBkEtmX5MCi6ICKolMUnIIcHREd0XjYC7xQhh5z22ARQhfClsVXDQvbZRi6T4UKOyY20XFZNG2jJH2mLsSKYn1Qr74Nnq+PNK1WfcMxBCVJKdKoymQO4+DcJndfL6+ml7M/m/VyNQS1ujczDYGEiogN6ApnZrQVIut9WiRQMEcHv3aeQYLnWqTlEqoZ7Zlz0zBrWSRY1f01bZBqyQCUdOzu09WXGS26/TaZfZs0q2hLbB6hv+XMtYYUTRcC9TxCLCoa7FLRenX9TSjakPQMvUo9Q5Ke3Xy6+Xr38y0pmtKjdapo1mtXNGtImma+Sk2Tcb/Lm//ejtBn9u8NaZtp9q1ttkRp+uoBGwWgN0uau/euH7hzSmT68vt5tJ/TAy+kMhBGEgsoHeIynd3AX23p8YJSDVOSThi1/IUbXPILG9/zUiXGkf9PcqOUgTwfo/0ak5FxxfqiehulKgwkVm3JFreDhBu6zCU1Et4WlwDq0ygOHLLSqpq4Ydm47LmLqXFIXG/hRkz5fKYuS2a9mjRznoFtT1eZORvOkfmsrzkjTTZfGL3rrSmQ+a5Ap/TitAIZTSvQi3A4S3Ci2fjjMdzLsexa9c0T9QFwtOcatAOsGbBfibaqi3QHoqk3bttfJpoWKkkCsM8UHaFBS6LTazRRTrGsqjnW4KwhdAYhcrYgQTqPFY+JnFgfAONcGRVatDRwII8cjKAZsOxiV5Jd8+89m/0z2VCG+TShuKRXtd0j/ZnIgZaWj2MmpuyaXrjG0pUxz1DYNZ6k5H3SoxX/7W5YZBXIZ4VAjReXm2RnSeV5WOgyLckK8uAveQB+o89zsT4t24lla6nX5kjErwV4GZ+mX1JLot41jq+SfPHDL/yUPFjob1cf6dFsP7/Drld648LbCUYqWrs7dhgmk9SKtoXQzpdBMjGMBbxHwuIGcZczYmFhVA8pUv58pl4xFrZai4Xf5z+cDHFPW/+OJkBI1tcQoaCW5zwAeUbYdzyPyOIXjmX8p05uOgeet9RU+gg0CzkNgTsAIBHxti7kKUnK9BS1NSMJWBKRv2z9+AOD2nK/8FGi9ptA26RJeSZUoAXdwm0yKnqHF5hpMdTmlUCaN8oaBRIKzC5Zk1vy15S7FH1XhfGhl7uvjOyn3RfsyH1BEUtH2gUD2B2D/6zkzM5NikAePQkQ/tEkR25gC7LaQo4De529M+xIrLoot4NKKiOxC6AhXbehZdgmLEM9urgKouW4LKNPwUd9fdim/ukXS6i0iKZJPtkmbmqUrO5Z+qs3EbEBhRfqdDgB9jseV/JCsKIXAmUvdHLeXQ3VhVVVt/Ekqh5XoaRSGS7h+fcZLPFwyH4Y5pHjFoUqilY5HLSdR7sUPOGKmgAnHl66+yCu050bBOQBe/8joZ+DPAcs5rcSLPPim+ySBXnlrmk/D9Hp7uvYnSx2TlGonswQElay6I4ibVRN/m0NyYHvo5onw4LTRsjqKn6wnYvDQhY9u2+GC3YdP8i4To40XBwQ29xIcQvFUNuxiG0XcdzXF15owqR+HcnhBVSuUGtNr2U4aEbFgQkTfsQLqpGU0NNEJF2P/trgKHJXmBphLWBa/AqZIMZ4SIU0dBvj2e/G9ShtnIrGFbWzdKvCIGW+7rQre+pIOvs1XieJ2G2wXyUpmIxdDX8ul6SZNAXuHzBHcvb77wXMgeYojGWniDmSR+vfEfNUfVRgRaeQOYJ9OrKhQ+aZhzrtysyOXJkImTtOL5A5yHbhqYyZ0xYdgObofS58A8JsdxeXPQOaOx1PZsjo0ydo7mkLjKEqbDNMA1htJVRO1YV9rSVUqN/dM0p+aHigOaq6Uq/5lKoeV1V7dElQcpoKNYeXxziKYY3Ojs3G7A607sgMiKC1Y/QNWqNeF/gO3H/bVY1AO5PPzwWt7a79tzwxa2igdUd6LYLWtgIC6Ba0RjIANsPhfcPrF/sJnmzFNoHdBk9Zx+9W8/jir9OrxLpK4UU0WhSLls2kLqNxZSxaRoRq7Z3geo6n3GHSxTpAsCWlNPuHoXUoEfpfBEOL8E/v87Z1eYeedxS6FOH3BkLrA9qobnggtF41+de7wu1EEBqWMWhg65U8Wm0UGp2JQXPXd7S+NImrVL8dxFrvedvYQcduVSU/y3d6RqzHooVsO5ST4bXOEeuu47v+kqx+oamBI9R6VXCq+TSrHldl0OgYQp1mR9NUhZpFq8E5nfWOTnek8qI3zlcbn0SnW8vo3hfHn3TBp311O7vxnItOj0HHzjoj0IDh6Y4UW4Snx4od4LqFpw0ZE2senu4JCRuD3gf3zTc/K6fLL2MYVkVLO6wtLQ153cLd7bSMtx2LbaKdu83Kcqju4nY/D/xo/YGvcPhYiISKDRpVYgw8A1sqzjqmhdym9gVC5X2BLNUQk9WpCve6tcJZKkxPbnHo0/dmfrSvcKhqPmQ0vsKsnpbK+dA+UoQZ2XZb+01wuWBLjg7Q9x/uHAe3JPIT5AFdzUkck81IxsaTLcVKO3Xt48DfUqXKPjX5LA+qq5OlCR/PUXy9ECm0yWxNm2TgZoa33ihfrcUkh7A70//czCmY8CbGI5AA0lqq/YBVtq614QhT9lS3Ibn3PcaW0k58KZNIafj0je57bgofiIJVt6prgk0T6/Y/3/+cgL+nd5vV1PxBfp99U3096jCKV8flHyGNgoBHqWUIm1wD3aw4HN2EB1eSC0rkSjPOxLWsfCo3LoeKo8Uab9zBinEtxojD0nqXK3CUfBnQ2Gc73w5SxjcnoyolrRSfBjpuGjoIoZ57yIF9cauW1ogDvtScVcQUmvg2iZLK8g4F/NskUXo3Su0Z99rDDZxqMUUHIlMUYa0SIG/NlslhbZEJ2oIl5douj6W8zAHJmf/cjTCLhxOHROOrQztWmo5j7nOPlWwdXJnHfFRj+ByW1E61uZUyc3kBUkpPQ8I26TmA3tT4rG+Ih1mN/wM=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 3b771746..c43b2e4c 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -1,25 +1,37 @@
package broadcast
import (
+ "fmt"
"sync"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
-const PluginName string = "broadcast"
+const (
+ PluginName string = "broadcast"
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ redis string = "redis"
+ memory string = "memory"
+)
type Plugin struct {
sync.RWMutex
- log logger.Logger
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
// publishers implement Publisher interface
// and able to receive a payload
- publishers map[string]pubsub.Publisher
+ publishers map[string]pubsub.PubSub
+ providers map[string]pubsub.PSProvider
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
@@ -27,9 +39,95 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
+ p.cfg = &Config{}
+ // unmarshal config section
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.publishers = make(map[string]pubsub.PubSub)
+ p.providers = make(map[string]pubsub.PSProvider)
- p.publishers = make(map[string]pubsub.Publisher)
p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("broadcast_plugin_serve")
+ errCh := make(chan error, 1)
+
+ // iterate over config
+ for k, v := range p.cfg.Data {
+ if v == nil {
+ continue
+ }
+
+ switch t := v.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ return errCh
+ }
+ default:
+ p.log.Warn("wrong type detected in the configuration, please, check yaml indentation")
+ continue
+ }
+
+ // config key for the particular sub-driver kv.memcached
+ configKey := fmt.Sprintf("%s.%s", PluginName, k)
+
+ switch v.(map[string]interface{})[driver] {
+ case memory:
+ if _, ok := p.providers[memory]; !ok {
+ p.log.Warn("no memory drivers registered", "registered", p.publishers)
+ continue
+ }
+ ps, err := p.providers[memory].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ case redis:
+ if _, ok := p.providers[redis]; !ok {
+ p.log.Warn("no redis drivers registered", "registered", p.publishers)
+ continue
+ }
+
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ ps, err := p.providers[redis].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ case p.cfgPlugin.Has(redis):
+ ps, err := p.providers[redis].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ continue
+ }
+ }
+ }
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
return nil
}
@@ -40,8 +138,9 @@ func (p *Plugin) Collects() []interface{} {
}
// CollectPublishers collect all plugins who implement pubsub.Publisher interface
-func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) {
- p.publishers[name.Name()] = subscriber
+func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) {
+ // key redis, value - interface
+ p.providers[name.Name()] = subscriber
}
// Publish is an entry point to the websocket PUBSUB
@@ -88,21 +187,25 @@ func (p *Plugin) PublishAsync(m []byte) {
// Get payload
for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.publishers[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ p.publishers[j].PublishAsync(m)
}
- } else {
- p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker())
+ return
}
+ p.log.Warn("no publishers registered")
}
}()
}
-func (p *Plugin) GetDriver(key string) pubsub.SubReader {
- println(key)
- return nil
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
+ const op = errors.Op("broadcast_plugin_get_driver")
+ // key - driver, default for example
+ // we should find `default` in the collected pubsubs providers
+ if pub, ok := p.publishers[key]; ok {
+ return pub, nil
+ }
+ return nil, errors.E(op, errors.Str("could not find driver by provided key"))
}
func (p *Plugin) RPC() interface{} {
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index efe92252..716e0d4c 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -85,6 +85,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
when user requests for example boltdb-south, we should provide that particular preconfigured storage
*/
for k, v := range p.cfg.Data {
+ // for example if the key not properly formatted (yaml)
+ if v == nil {
+ continue
+ }
+
if _, ok := v.(map[string]interface{})[driver]; !ok {
errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
return errCh
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 6151ebf0..d4d535bf 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,7 +2,7 @@ package memory
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index 75cd9d24..02246a8f 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -4,8 +4,8 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 24c21b55..8d997041 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -5,7 +5,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index dbda7ea4..c2a88abe 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,8 +6,8 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
@@ -62,6 +62,11 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
MasterName: ps.cfg.MasterName,
})
+ statusCmd := ps.universalClient.Ping(context.Background())
+ if statusCmd.Err() != nil {
+ return nil, statusCmd.Err()
+ }
+
ps.fanin = newFanIn(ps.universalClient, log)
ps.stop()
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index b1d5d0a8..933a12e0 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -4,6 +4,7 @@ import (
"strings"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
)
@@ -17,9 +18,9 @@ websockets:
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
- Path string `mapstructure:"path"`
-
+ Path string `mapstructure:"path"`
AllowedOrigin string `mapstructure:"allowed_origin"`
+ Broker string `mapstructure:"broker"`
// wildcard origin
allowedWOrigins []wildcard
@@ -31,11 +32,16 @@ type Config struct {
}
// InitDefault initialize default values for the ws config
-func (c *Config) InitDefault() {
+func (c *Config) InitDefault() error {
if c.Path == "" {
c.Path = "/ws"
}
+ // broker is mandatory
+ if c.Broker == "" {
+ return errors.Str("broker key should be specified")
+ }
+
if c.Pool == nil {
c.Pool = &pool.Config{}
if c.Pool.NumWorkers == 0 {
@@ -64,7 +70,7 @@ func (c *Config) InitDefault() {
if origin == "*" {
// If "*" is present in the list, turn the whole list into a match all
c.allowedAll = true
- return
+ return nil
} else if i := strings.IndexByte(origin, '*'); i >= 0 {
// Split the origin in two: start and end string without the *
w := wildcard{origin[0:i], origin[i+1:]}
@@ -72,4 +78,6 @@ func (c *Config) InitDefault() {
} else {
c.allowedOrigins = append(c.allowedOrigins, origin)
}
+
+ return nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 07f22043..799312ad 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,8 +7,8 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -28,8 +28,8 @@ type Executor struct {
// associated connection ID
connID string
- // map with the pubsub drivers
- pubsub map[string]pubsub.Subscriber
+ // subscriber drivers
+ sub pubsub.Subscriber
actualTopics map[string]struct{}
req *http.Request
@@ -38,12 +38,12 @@ type Executor struct {
// NewExecutor creates protected connection and starts command loop
func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
log: log,
- pubsub: pubsubs,
+ sub: sub,
accessValidator: av,
actualTopics: make(map[string]struct{}, 10),
req: r,
@@ -126,11 +126,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
// subscribe to the topic
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Set(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Set(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
// handle leave
@@ -155,11 +153,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Leave(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Leave(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
case commands.Headers:
@@ -170,13 +166,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
}
-func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
+func (e *Executor) Set(topics []string) error {
// associate connection with topics
- err := br.Subscribe(e.connID, topics...)
+ err := e.sub.Subscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
- _ = br.Unsubscribe(e.connID, topics...)
+ _ = e.sub.Unsubscribe(e.connID, topics...)
return err
}
@@ -188,9 +184,9 @@ func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
return nil
}
-func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
+func (e *Executor) Leave(topics []string) error {
// remove associated connections from the storage
- err := br.Unsubscribe(e.connID, topics...)
+ err := e.sub.Unsubscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
return err
@@ -207,10 +203,7 @@ func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
func (e *Executor) CleanUp() {
// unsubscribe particular connection from the topics
for topic := range e.actualTopics {
- // here
- for _, ps := range e.pubsub {
- _ = ps.Unsubscribe(e.connID, topic)
- }
+ _ = e.sub.Unsubscribe(e.connID, topic)
}
// clean up the actualTopics data
diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go
index e877fad3..ec6e1960 100644
--- a/plugins/websockets/origin_test.go
+++ b/plugins/websockets/origin_test.go
@@ -11,7 +11,8 @@ func TestConfig_Origin(t *testing.T) {
AllowedOrigin: "*",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("http://some.some.some.sssome", cfg))
assert.True(t, isOriginAllowed("http://", cfg))
@@ -29,7 +30,8 @@ func TestConfig_OriginWildCard(t *testing.T) {
AllowedOrigin: "https://*my.site.com",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("https://my.site.com", cfg))
assert.False(t, isOriginAllowed("http://", cfg))
@@ -50,7 +52,8 @@ func TestConfig_OriginWildCard2(t *testing.T) {
AllowedOrigin: "https://my.*.com",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("https://my.site.com", cfg))
assert.False(t, isOriginAllowed("http://", cfg))
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index cf861c72..de7443fd 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -9,13 +9,12 @@ import (
"github.com/fasthttp/websocket"
"github.com/google/uuid"
json "github.com/json-iterator/go"
- endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/interface/broadcast"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -33,16 +32,14 @@ const (
type Plugin struct {
sync.RWMutex
- // Collection with all available pubsubs
- //pubsubs map[string]pubsub.PubSub
- //psProviders map[string]pubsub.PSProvider
+ // subscriber+reader interfaces
+ subReader pubsub.SubReader
+ // broadcaster
+ broadcaster broadcast.Broadcaster
- subReaders map[string]pubsub.SubReader
-
- cfg *Config
- cfgPlugin config.Configurer
- log logger.Logger
+ cfg *Config
+ log logger.Logger
// global connections map
connections sync.Map
@@ -53,8 +50,10 @@ type Plugin struct {
wsUpgrade *websocket.Upgrader
serveExit chan struct{}
+ // workers pool
phpPool phpPool.Pool
- server server.Server
+ // server which produces commands to the pool
+ server server.Server
// function used to validate access to the requested resource
accessValidator validator.AccessValidatorFn
@@ -71,14 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- p.cfg.InitDefault()
- //p.pubsubs = make(map[string]pubsub.PubSub)
- //p.psProviders = make(map[string]pubsub.PSProvider)
-
- p.subReaders = make(map[string]pubsub.SubReader)
-
- p.log = log
- p.cfgPlugin = cfg
+ err = p.cfg.InitDefault()
+ if err != nil {
+ return errors.E(op, err)
+ }
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
@@ -90,19 +85,21 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.serveExit = make(chan struct{})
p.server = server
-
+ p.log = log
+ p.broadcaster = b
return nil
}
func (p *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
const op = errors.Op("websockets_plugin_serve")
-
- //err := p.initPubSubs()
- //if err != nil {
- // errCh <- errors.E(op, err)
- // return errCh
- //}
+ errCh := make(chan error, 1)
+ // init broadcaster
+ var err error
+ p.subReader, err = p.broadcaster.GetDriver(p.cfg.Broker)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
go func() {
var err error
@@ -124,78 +121,28 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
- p.workersPool = pool.NewWorkersPool(p.subReaders, &p.connections, p.log)
+ p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log)
// run all pubsubs drivers
- for _, v := range p.subReaders {
- go func(ps pubsub.SubReader) {
- for {
- select {
- case <-p.serveExit:
+ go func(ps pubsub.Reader) {
+ for {
+ select {
+ case <-p.serveExit:
+ return
+ default:
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
return
- default:
- data, err := ps.Next()
- if err != nil {
- errCh <- err
- return
- }
- p.workersPool.Queue(data)
}
+ p.workersPool.Queue(data)
}
- }(v)
- }
+ }
+ }(p.subReader)
return errCh
}
-//func (p *Plugin) initPubSubs() error {
-// for i := 0; i < len(p.cfg.PubSubs); i++ {
-// // don't need to have a section for the in-memory
-// if p.cfg.PubSubs[i] == "memory" {
-// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
-// r, err := provider.PSProvide("")
-// if err != nil {
-// return err
-// }
-//
-// // append default in-memory provider
-// p.pubsubs["memory"] = r
-// }
-// continue
-// }
-// // key - memory, redis
-// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
-// // try local key
-// switch {
-// // try local config first
-// case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
-// r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
-// if err != nil {
-// return err
-// }
-//
-// // append redis provider
-// p.pubsubs[p.cfg.PubSubs[i]] = r
-// case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
-// r, err := provider.PSProvide(p.cfg.PubSubs[i])
-// if err != nil {
-// return err
-// }
-//
-// // append redis provider
-// p.pubsubs[p.cfg.PubSubs[i]] = r
-// default:
-// return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
-// }
-// } else {
-// // no such driver
-// p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
-// }
-// }
-//
-// return nil
-//}
-
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
@@ -210,23 +157,12 @@ func (p *Plugin) Stop() error {
return nil
}
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.GetSubsReader,
- }
-}
-
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
return PluginName
}
-// GetSubsReader collects all plugins which implement SubReader interface
-func (p *Plugin) GetSubsReader(name endure.Named, pub pubsub.SubReader) {
- p.subReaders[name.Name()] = pub
-}
-
func (p *Plugin) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != p.cfg.Path {
@@ -272,7 +208,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Store(connectionID, safeConn)
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, nil, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, p.subReader, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
err = e.StartCommandLoop()
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 22042d8d..cd9444da 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,15 +4,15 @@ import (
"sync"
json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
- storage map[string]pubsub.SubReader
+ subscriber pubsub.Subscriber
connections *sync.Map
resPool sync.Pool
log logger.Logger
@@ -22,11 +22,11 @@ type WorkersPool struct {
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *websocketsv1.Message, 100),
- storage: pubsubs,
+ subscriber: subscriber,
log: log,
exit: make(chan struct{}),
}
@@ -90,19 +90,13 @@ func (wp *WorkersPool) do() { //nolint:gocognit
continue
}
- br, ok := wp.storage[msg.Broker]
- if !ok {
- wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
- continue
- }
-
// send a message to every topic
for i := 0; i < len(msg.GetTopics()); i++ {
// get free map
res := wp.get()
// get connections for the particular topic
- br.Connections(msg.GetTopics()[i], res)
+ wp.subscriber.Connections(msg.GetTopics()[i], res)
if len(res) == 0 {
wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
@@ -114,7 +108,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for topic := range res {
c, ok := wp.connections.Load(topic)
if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
+ wp.log.Warn("the user disconnected connection before the message being written to it", "topics", msg.GetTopics()[i])
wp.put(res)
continue
}
@@ -135,7 +129,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit
err = c.(*connection.Connection).Write(d)
if err != nil {
for i := 0; i < len(msg.GetTopics()); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
+ wp.log.Error("error sending payload over the connection", "error", err, "topics", msg.GetTopics()[i])
}
wp.put(res)
continue
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index 65ee4415..5b195bd0 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -98,8 +98,7 @@ func TestBroadcastInit(t *testing.T) {
}
}()
- time.Sleep(time.Second * 1)
- //t.Run("TestWSInit", wsInit)
+ t.Run("TestWSInit", wsInit)
stopCh <- struct{}{}
@@ -186,7 +185,7 @@ func messageWS(command string, broker string, payload []byte, topics ...string)
return &websocketsv1.Message{
Topics: topics,
Command: command,
- Broker: broker,
+ //Broker: broker,
Payload: payload,
}
}
@@ -197,7 +196,7 @@ func makeMessage(command string, broker string, payload []byte, topics ...string
{
Topics: topics,
Command: command,
- Broker: broker,
+ //Broker: broker,
Payload: payload,
},
},
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
index fa4116d0..6962eeb5 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
@@ -23,19 +23,21 @@ redis:
addrs:
- "localhost:6379"
+
broadcast:
- default:
- driver: redis
- test:
- driver: memory
+ default:
+ driver: redis
+ addrs:
+ - "localhost:6379"
websockets:
- pubsubs: [ "redis" ]
+ broker: default
+ allowed_origin: "*"
path: "/ws"
logs:
mode: development
- level: error
+ level: debug
endure:
grace_period: 120s
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
index dc073be3..b6882d84 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
@@ -23,12 +23,18 @@ redis:
addrs:
- "localhost:6379"
+broadcast:
+ default:
+ driver: redis
+ addrs:
+ - "localhost:6379"
+
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
- # pubsubs might use general config section or its own
- pubsubs: [ "redis" ]
+ broker: default
+ allowed_origin: "*"
path: "/ws"
+
logs:
mode: development
level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
index 896cee05..f81e13e3 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml
@@ -23,8 +23,13 @@ redis:
addrs:
- "localhost:6379"
+broadcast:
+ test:
+ driver: memory
+
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
index e3bf5218..decb7dcf 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml
@@ -19,12 +19,13 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
-redis:
- addrs:
- - "localhost:6379"
+broadcast:
+ test:
+ driver: memory
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
index 0614f4e7..5377aef2 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml
@@ -19,12 +19,13 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
-redis:
- addrs:
- - "localhost:6379"
+broadcast:
+ test:
+ driver: memory
websockets:
- pubsubs: [ "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml
index 27eab557..a077bf9e 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml
@@ -20,17 +20,18 @@ http:
destroy_timeout: 60s
-websockets:
- pubsubs: [ "redis", "memory" ]
- redis:
- addrs:
- - "localhost:6379"
+broadcast:
+ test:
+ driver: memory
+websockets:
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
mode: development
- level: error
+ level: debug
endure:
grace_period: 120s
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml
index fd125794..d80993f2 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml
@@ -21,7 +21,8 @@ http:
websockets:
- pubsubs: [ "redis", "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml
index eedf5377..3557f5f1 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml
@@ -23,10 +23,13 @@ redis:
addrs:
- "localhost:6379"
+broadcast:
+ test:
+ driver: redis
+
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
- # pubsubs might use general config section
- pubsubs: [ "redis", "memory" ]
+ broker: test
+ allowed_origin: "*"
path: "/ws"
logs:
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 8321297d..cb78117f 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -17,6 +17,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -47,6 +48,7 @@ func TestBroadcastInit(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -98,6 +100,8 @@ func TestBroadcastInit(t *testing.T) {
time.Sleep(time.Second * 1)
t.Run("TestWSInit", wsInit)
+ t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
+ t.Run("RPCWsMemory", RPCWsMemory)
stopCh <- struct{}{}
@@ -119,7 +123,7 @@ func wsInit(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -156,6 +160,7 @@ func TestWSRedisAndMemory(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -219,7 +224,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) {
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "configs/.rr-websockets-redis-memory.yaml",
+ Path: "configs/.rr-websockets-redis.yaml",
Prefix: "rr",
}
@@ -232,6 +237,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -281,8 +287,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
- t.Run("RPCWsMemory", RPCWsMemory)
+
t.Run("RPCWsRedis", RPCWsRedis)
stopCh <- struct{}{}
@@ -308,6 +313,7 @@ func TestWSRedisNoSection(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -326,7 +332,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
HandshakeTimeout: time.Second * 20,
}
- connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
c, resp, err := da.Dial(connURL.String(), nil)
assert.NoError(t, err)
@@ -335,7 +341,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -359,7 +365,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -386,7 +392,7 @@ func RPCWsMemoryPubAsync(t *testing.T) {
_, msg, err = c.ReadMessage()
retMsg = utils.AsString(msg)
assert.NoError(t, err)
- assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg)
+ assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg)
err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
assert.NoError(t, err)
@@ -398,7 +404,7 @@ func RPCWsMemory(t *testing.T) {
HandshakeTimeout: time.Second * 20,
}
- connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"}
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
c, resp, err := da.Dial(connURL.String(), nil)
assert.NoError(t, err)
@@ -409,7 +415,7 @@ func RPCWsMemory(t *testing.T) {
}
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -433,7 +439,7 @@ func RPCWsMemory(t *testing.T) {
assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -481,7 +487,7 @@ func RPCWsRedis(t *testing.T) {
_ = resp.Body.Close()
}()
- d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -505,7 +511,7 @@ func RPCWsRedis(t *testing.T) {
assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -556,6 +562,7 @@ func TestWSMemoryDeny(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -631,7 +638,7 @@ func RPCWsMemoryDeny(t *testing.T) {
}
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -647,7 +654,7 @@ func RPCWsMemoryDeny(t *testing.T) {
assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -684,6 +691,7 @@ func TestWSMemoryStop(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -777,6 +785,7 @@ func TestWSMemoryOk(t *testing.T) {
&websockets.Plugin{},
&httpPlugin.Plugin{},
&memory.Plugin{},
+ &broadcast.Plugin{},
)
assert.NoError(t, err)
@@ -852,7 +861,7 @@ func RPCWsMemoryAllow(t *testing.T) {
}
}()
- d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2"))
if err != nil {
panic(err)
}
@@ -876,7 +885,7 @@ func RPCWsMemoryAllow(t *testing.T) {
assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg)
// //// LEAVE foo, foo2 /////////
- d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo"))
+ d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo"))
if err != nil {
panic(err)
}
@@ -909,7 +918,7 @@ func RPCWsMemoryAllow(t *testing.T) {
assert.NoError(t, err)
}
-func publish(command string, broker string, topics ...string) {
+func publish(command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -918,13 +927,13 @@ func publish(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret)
if err != nil {
panic(err)
}
}
-func publishAsync(t *testing.T, command string, broker string, topics ...string) {
+func publishAsync(t *testing.T, command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -933,12 +942,12 @@ func publishAsync(t *testing.T, command string, broker string, topics ...string)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
+ err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func publishAsync2(t *testing.T, command string, broker string, topics ...string) {
+func publishAsync2(t *testing.T, command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -947,12 +956,12 @@ func publishAsync2(t *testing.T, command string, broker string, topics ...string
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
+ err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP2"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func publish2(t *testing.T, command string, broker string, topics ...string) {
+func publish2(t *testing.T, command string, topics ...string) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
if err != nil {
panic(err)
@@ -961,27 +970,25 @@ func publish2(t *testing.T, command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
ret := &websocketsv1.Response{}
- err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
+ err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret)
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
-func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message {
+func messageWS(command string, payload []byte, topics ...string) *websocketsv1.Message {
return &websocketsv1.Message{
Topics: topics,
Command: command,
- Broker: broker,
Payload: payload,
}
}
-func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request {
+func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request {
m := &websocketsv1.Request{
Messages: []*websocketsv1.Message{
{
Topics: topics,
Command: command,
- Broker: broker,
Payload: payload,
},
},