diff options
author | Valery Piashchynski <[email protected]> | 2021-06-18 01:06:16 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-18 01:06:16 +0300 |
commit | fe7bb0fe758d573fe353df028257ed66c6eccf66 (patch) | |
tree | 74392f8e61e96c85f0d8b684cfc08e3fc3664ae9 | |
parent | 68ff941c4226074206ceed9c30bd95317aa0e9fc (diff) |
- Rework main parts
Signed-off-by: Valery Piashchynski <[email protected]>
27 files changed, 328 insertions, 258 deletions
diff --git a/pkg/interface/broadcast/broadcast.go b/pkg/interface/broadcast/broadcast.go index c922c82e..4c49f7c5 100644 --- a/pkg/interface/broadcast/broadcast.go +++ b/pkg/interface/broadcast/broadcast.go @@ -1,7 +1,7 @@ package broadcast -import "github.com/spiral/roadrunner/v2/pkg/pubsub" +import "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" type Broadcaster interface { - GetDriver(key string) pubsub.SubReader + GetDriver(key string) (pubsub.SubReader, error) } diff --git a/pkg/pubsub/interface.go b/pkg/interface/pubsub/interface.go index 30b544db..30b544db 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/interface/pubsub/interface.go diff --git a/pkg/proto/websockets/v1beta/websockets.pb.go b/pkg/proto/websockets/v1beta/websockets.pb.go index d39b55da..ad4ebbe7 100644 --- a/pkg/proto/websockets/v1beta/websockets.pb.go +++ b/pkg/proto/websockets/v1beta/websockets.pb.go @@ -26,9 +26,8 @@ type Message struct { unknownFields protoimpl.UnknownFields Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"` - Broker string `protobuf:"bytes,2,opt,name=broker,proto3" json:"broker,omitempty"` - Topics []string `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"` - Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` + Topics []string `protobuf:"bytes,2,rep,name=topics,proto3" json:"topics,omitempty"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` } func (x *Message) Reset() { @@ -70,13 +69,6 @@ func (x *Message) GetCommand() string { return "" } -func (x *Message) GetBroker() string { - if x != nil { - return x.Broker - } - return "" -} - func (x *Message) GetTopics() []string { if x != nil { return x.Topics @@ -91,6 +83,7 @@ func (x *Message) GetPayload() []byte { return nil } +// RPC request with messages type Request struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -138,6 +131,7 @@ func (x *Request) GetMessages() []*Message { return nil } +// RPC response (false in case of error) type Response struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -190,22 +184,20 @@ var File_websockets_proto protoreflect.FileDescriptor var file_websockets_proto_rawDesc = []byte{ 0x0a, 0x10, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x11, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, - 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x55, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72, - 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03, - 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x41, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, - 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x02, 0x6f, 0x6b, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, - 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, + 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x41, 0x0a, 0x07, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, + 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, + 0x1a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, + 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x42, 0x15, 0x5a, 0x13, 0x2e, + 0x2f, 0x3b, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, + 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/proto/websockets/v1beta/websockets.proto b/pkg/proto/websockets/v1beta/websockets.proto index ede3cde9..5be6f70f 100644 --- a/pkg/proto/websockets/v1beta/websockets.proto +++ b/pkg/proto/websockets/v1beta/websockets.proto @@ -5,9 +5,8 @@ option go_package = "./;websocketsv1beta"; message Message { string command = 1; - string broker = 2; - repeated string topics = 3; - bytes payload = 4; + repeated string topics = 2; + bytes payload = 3; } // RPC request with messages diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go index 18846f30..4f1e5213 100644 --- a/plugins/broadcast/config.go +++ b/plugins/broadcast/config.go @@ -1,6 +1,9 @@ package broadcast /* + +# Global redis config (priority - 2) + websockets: # <----- one of possible subscribers path: /ws broker: default # <------ broadcast broker to use --------------- | @@ -8,9 +11,12 @@ websockets: # <----- one of possible subscribers broadcast: # <-------- broadcast entry point plugin | default: # <----------------------------------------------------- | driver: redis + # local redis config (priority - 1) test: driver: memory + +priority local -> global */ // Config ... diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio index b8d2947e..b2ee091a 100644 --- a/plugins/broadcast/doc/broadcast_arch.drawio +++ b/plugins/broadcast/doc/broadcast_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-17T16:23:35.917Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="1VvfJYAxL9mW7TkXHKVj" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5s4F/41nml3xhkkcf2YW7ud2U6z8buz7acdbGSbLUZewLnsr38lEBgkOcHhojibtHHgIMlwdK6PLkzQ5ebhc+Jv119JgKMJNIKHCbqaQAhNaNM/jPJYUBDyYEFZJWFQ0MCeMAv/xZxocOouDHDaKJgREmXhtklckDjGi6xB85OE3DeLLUnU/Natv8ISYbbwI5n6Zxhka/5ghmHsL/yKw9U6E69s/LI0J6RrPyD3NRK6nqDLhJCsONo8XOKIsa9kTFHv04Gr1Z0lOM7aVLg0bfvLxed46nxH01l8t/5OPk9ts2jmzo92/JH53WaPJQ8SsosDzFoxJujifh1meLb1F+zqPe12Sltnm4ieAXq4DKPokkQkyeuiwMfuckHpaZaQn7h2xV64eL6kV/gN4CTDDwcfDVQMo7KGyQZnySMtwisgl/OYi5lp8fP7Rp8VtHW9uzxO9LmcrKq295ykB5yZRzDWsodl7NJi/5SMzX9YDRJnNXrx0w/DgQGaHHccmeOmq+A4sq2hOG69bY7DJsenlUBrZPnA1kM3ywUhR4alm+OexF8cUIfFT0mSrcmKxH50vadeNHtgX+Y3Qrac73/jLHvk3tffZaTZK/ghzL6z6mcWP/vBG2PHVw/1k8fyJKaPW6vETn+U7bGTfbX8rKxXPB97qKd7jfKA7JIFfoJXZbzgJyucPVXOUItBgiM/C++aN6LqUV71hoT0FivxcaBxBkzoWg7/bMiSZQiep7hN3oYgJdVNvVxwSonUJTmwLjqgpejUBacmRwdERzQe7gIvlKHH3LVYhKBF2IqAq6uwnSeJ/1grsGVikx6WRdu1GtJnm0KsKJYH3cq74OnyyDA6lbc8S1CSgiO9qgyQnNvF7bfzq8vz2f/69XJtBbWTN7MtgYWKiA2YCmdmDRUimzotEqiZo71fO84gwWMt0nIJ1R0d2HPbsvu1SLCt++tokDrJAJR07Pb66suMkm7+uJj9cdGvoi2xfYD/jjc3elI0UwjUqwixrmhwTEXT6vr7ULRXrWfoFPQMSXr29frrt9sfb0nRlB5tVEVzTl3RnFetafYpaJqM+51//f1mgj6x/29I22xbt7a5Eqfpo0dsFIB+WV7dv/PDyJ9TJtMn3c3T3ZweBAmVgSSVuoCyJmvy2Y/CVUyPF5RFmLL0gjEwXPjROb+wCYOgUGKchv/mX1R0IM/HaLvWxcS6Ym1RvU0LFQZSV8UkxsMg4ZYp95IaCR+qlwDSaRRPDbIy2po4rTauvM16apwQP1j4KVO+kKnLklmvPs1cYGE3MFVmzoVzZNv9KFBl1o6N3s3BFMh+V6D2CtQa9LU6KtCLcDhHcKLl+OMh3Mtz3E7l7WfKA+AZT1UYBlizoF6JdtqLtA7RNLva9peJpoMakgDcI0VHqDCQ6GiNJpopltM2x3r91hB6OkTOFSTI5LHiIZETywNgHSujQo2BBg7kkYMJtCOWXWwbsmv/s2Ozfy42tH9CmlCc06vG9oF+5nJgFPRpxsSUXTNr11i6MuUZCrvGk5SqTXq04n/9DYusIvmsFqhxcrNKeZYXnie1JgtKSaiCv/wG+Bd9movlKW0r0tZSq/2xiF+L8DJ7nn95KYl7n3F2leeLH37ix/zGkjBefaRHs938FvtB44lrTycYqXTtb9lhkk9Sq9sWQhtfRvnEMBbwHgiLX4q7dIuFhVE9pEj5q5l69VjYGSwWfp//cIT1t9pa/2EmQEjW1xKhoIHnPAB5RtifeJ6SxU+cyfhPl9x0DoJgaaj0ERgO8noCdwBAIuLtnMlTkpTpKRpqRhJwJCZ/icPsA4PaKr/wUeL2m0DbpEl5NlSgBePCbTIqeosXmGkxNOatQJo32jUKJBTYY3ZNZbhPKXep+64W40M9uq+yH553X3AY9wVFLB0ZZwxg9yz+2cqZHZsUgSp6EiD8g0mOXMEVZHWAHAdqnb1zYpFYe1HuBZVURmJnwECm6ULHcm3YhHpMcRXEwHFZyY6aj/p2Hxf+6SdLqIyUpkkhiXM3NclX9yzD1ZuI2IDCC406nAD1jsc1vBBs6YVA0ws9O++uT9WFbVW3axLVrVehpFIlLhGEdyUscb/PfhjmUeEWtSKKWhUcFM/TbQGecEXNgZMAL/1dlHVpzo8ico+Dv0gSViDPHov5pQHLvPhLtvmCvGbTtJ379Pnmu9idMnYuUChNZggJK1lMT5E2qib/DobkwPdRzSOMUGskxxkofnC9s/1CFhM6DWlCY8cPMq5TIQ1ne8S2MlLcQjHUdipi23Uc9/TCC0OY1G8iObyAyhVqg+m1DAfNqDgwYcIPeEE1kjL6MhdJP6B/NjhN/RWmRtiImBafYCeIMR5SIQ3jxnjuu3Ftb1y9lsYV9bJ0q8UgZbXudCx76kk6+y1b54nYTbRb5SmYjF29/rlckmbSFFg/YI7k7Pe/C5gDw1MYy1ERcySP1r8j5oX6qMCKUSFzBHU6spODzJHZ1pXZw7gyETL3PC2QOSh34WmNmdMaI4Dm6H0u/BDC7A4Wlz0BmnsjT2Yo2aETNA+MBcZQFbZZtgWcoRIqr+3CvsESKqR394yGHzoB0By1XanXOaXq1quqPbokKLlIhfrDyzOcZrBDY4dmY44HWo9kBkTQ2rN0g9ZI6wLfU/Pfblsj0Mvk82NBa3ds/y1PzHptoPVIei2C1q4CAhgXtEYyADbDyV3P6xf1BE+uYpvAcYOnsuF3q9nCapqtV4kNlMKLaLQoFgObSVNG45pYtIwIddo7wQ+8QLnDpI9NgOBASmnrh6FNKDH6PwRDi/CP9nnbprxDzzsK3YjwtYHQ5ivaqO4EQGizbfJvDoTbiSA0bGLQwDVbebTOKDQ6EoPmru9geWkSV6P8MIi1qXnb2NOK3dpKvjXcgrsnEOupaCGHDuVkeG10xHrs+E5fkqUXmjo1hNpsC051TrO69aoMGh1CqIvs6LJQoX7RanBMY9rR6ZFUXvTG1WrjZ9HpwTK698Xx7Q1AiUE+76t72Y3nWHR6CkZ21iU/XjE8PZJii/D0VLED3LjwtCVjYv3D05qQsCnQPrhvv/lZOVrfjGE5LS2t1i0tLXndwu3NZRNvOxTbpFs/LmkVVHd2s5tHYbr+wFc4fKxFQvUKvSoxBoGFHVXPeraD/L72BULNfYEc1RCTM6oKa91a4SgVpic3OAnpczM/+mrCobb5kNV1hVk3LZXzoV2qCDPK7bZ2m+h8wZYc7aHv3/w5jm5IGubIA7qakywjm4mMjedbijV26tplURhTpSpfNWn0o06OIbw8R/H2QqTQJnswbZKBmxmOg0m1WosJCmHfTH+5mVN0wpsYj0ACSOuo9gNW2brBhiNs2VPdJOQuDFi3NHbiKzqJNIZP3+i+57bwgijYdqu6F3QTPd2/tbXIv/Zvv0XX/wc=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-06-17T18:52:07.846Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="eUG-4GjLSDkFvWQsKkB7" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5u4Gv41nml3xhkkcf0YO222M5vTnHg7bT+dwUa22WLLCziX/fVHAoFBkmMcrsmmlwSEJOC9v48ujNB083gdurv1DfFwMIKa9zhCVyMIoWZC+ouVPKUlCAIjLVmFvpeWgUPBzP8H80KNl+59D0elijEhQezvyoULst3iRVwqc8OQPJSrLUlQvuvOXWGpYLZwA7n0u+/F6+zFNO1w4Xfsr9axeGXjZrV5QbR2PfJQKEKfRmgaEhKnR5vHKQ4Y+TLCpO0+H7maP1mIt3GVBlPdNL9Mrrdj6wcaz7b36x/kemzqaTf3brDnr8yfNn7KaBCS/dbDrBdthCYPaz/Gs527YFcfKNtp2TreBPQM0MOlHwRTEpAwaYs8F9vLBS2P4pD8woUr5sLG8yW9Ir8Hf7V7HMb4sVDE3+sakw2OwydahV9FNqcxFzPd4OcPJZ6lZesiuxxe6HI5WeV9HyhJDzgxzyCsYbZL2KXB/ioJm/xhLcg2LpSnf5ohONBAmeKWJVNctxUUR6bRFsWNt01xWKb4OBfoHknesvXom+SCkCPN6JvijkRf7FGHxU9JGK/Jimzd4NOhdFLmwKHOH4TsON3/wnH8xL2vu49JmSv40Y9/sOYXBj/7yTtjx1ePxZOn7GRLX7fQiJ3+zPpjJ4dmyVnW7ijfIrIPF/gZ0mThgRuucPxcPc4wRrhnxSDEgRv79+VIQMVR3vSW+PSZc/GxoHYBdGgbFv9ZkiVDEzxP+ty8D0FK8od6ueBkEtmX5MCi6ICKolMUnIIcHREd0XjYC7xQhh5z22ARQhfClsVXDQvbZRi6T4UKOyY20XFZNG2jJH2mLsSKYn1Qr74Nnq+PNK1WfcMxBCVJKdKoymQO4+DcJndfL6+ml7M/m/VyNQS1ujczDYGEiogN6ApnZrQVIut9WiRQMEcHv3aeQYLnWqTlEqoZ7Zlz0zBrWSRY1f01bZBqyQCUdOzu09WXGS26/TaZfZs0q2hLbB6hv+XMtYYUTRcC9TxCLCoa7FLRenX9TSjakPQMvUo9Q5Ke3Xy6+Xr38y0pmtKjdapo1mtXNGtImma+Sk2Tcb/Lm//ejtBn9u8NaZtp9q1ttkRp+uoBGwWgN0uau/euH7hzSmT68vt5tJ/TAy+kMhBGEgsoHeIynd3AX23p8YJSDVOSThi1/IUbXPILG9/zUiXGkf9PcqOUgTwfo/0ak5FxxfqiehulKgwkVm3JFreDhBu6zCU1Et4WlwDq0ygOHLLSqpq4Ydm47LmLqXFIXG/hRkz5fKYuS2a9mjRznoFtT1eZORvOkfmsrzkjTTZfGL3rrSmQ+a5Ap/TitAIZTSvQi3A4S3Ci2fjjMdzLsexa9c0T9QFwtOcatAOsGbBfibaqi3QHoqk3bttfJpoWKkkCsM8UHaFBS6LTazRRTrGsqjnW4KwhdAYhcrYgQTqPFY+JnFgfAONcGRVatDRwII8cjKAZsOxiV5Jd8+89m/0z2VCG+TShuKRXtd0j/ZnIgZaWj2MmpuyaXrjG0pUxz1DYNZ6k5H3SoxX/7W5YZBXIZ4VAjReXm2RnSeV5WOgyLckK8uAveQB+o89zsT4t24lla6nX5kjErwV4GZ+mX1JLot41jq+SfPHDL/yUPFjob1cf6dFsP7/Drld648LbCUYqWrs7dhgmk9SKtoXQzpdBMjGMBbxHwuIGcZczYmFhVA8pUv58pl4xFrZai4Xf5z+cDHFPW/+OJkBI1tcQoaCW5zwAeUbYdzyPyOIXjmX8p05uOgeet9RU+gg0CzkNgTsAIBHxti7kKUnK9BS1NSMJWBKRv2z9+AOD2nK/8FGi9ptA26RJeSZUoAXdwm0yKnqHF5hpMdTmlUCaN8oaBRIKzC5Zk1vy15S7FH1XhfGhl7uvjOyn3RfsyH1BEUtH2gUD2B2D/6zkzM5NikAePQkQ/tEkR25gC7LaQo4De529M+xIrLoot4NKKiOxC6AhXbehZdgmLEM9urgKouW4LKNPwUd9fdim/ukXS6i0iKZJPtkmbmqUrO5Z+qs3EbEBhRfqdDgB9jseV/JCsKIXAmUvdHLeXQ3VhVVVt/Ekqh5XoaRSGS7h+fcZLPFwyH4Y5pHjFoUqilY5HLSdR7sUPOGKmgAnHl66+yCu050bBOQBe/8joZ+DPAcs5rcSLPPim+ySBXnlrmk/D9Hp7uvYnSx2TlGonswQElay6I4ibVRN/m0NyYHvo5onw4LTRsjqKn6wnYvDQhY9u2+GC3YdP8i4To40XBwQ29xIcQvFUNuxiG0XcdzXF15owqR+HcnhBVSuUGtNr2U4aEbFgQkTfsQLqpGU0NNEJF2P/trgKHJXmBphLWBa/AqZIMZ4SIU0dBvj2e/G9ShtnIrGFbWzdKvCIGW+7rQre+pIOvs1XieJ2G2wXyUpmIxdDX8ul6SZNAXuHzBHcvb77wXMgeYojGWniDmSR+vfEfNUfVRgRaeQOYJ9OrKhQ+aZhzrtysyOXJkImTtOL5A5yHbhqYyZ0xYdgObofS58A8JsdxeXPQOaOx1PZsjo0ydo7mkLjKEqbDNMA1htJVRO1YV9rSVUqN/dM0p+aHigOaq6Uq/5lKoeV1V7dElQcpoKNYeXxziKYY3Ojs3G7A607sgMiKC1Y/QNWqNeF/gO3H/bVY1AO5PPzwWt7a79tzwxa2igdUd6LYLWtgIC6Ba0RjIANsPhfcPrF/sJnmzFNoHdBk9Zx+9W8/jir9OrxLpK4UU0WhSLls2kLqNxZSxaRoRq7Z3geo6n3GHSxTpAsCWlNPuHoXUoEfpfBEOL8E/v87Z1eYeedxS6FOH3BkLrA9qobnggtF41+de7wu1EEBqWMWhg65U8Wm0UGp2JQXPXd7S+NImrVL8dxFrvedvYQcduVSU/y3d6RqzHooVsO5ST4bXOEeuu47v+kqx+oamBI9R6VXCq+TSrHldl0OgYQp1mR9NUhZpFq8E5nfWOTnek8qI3zlcbn0SnW8vo3hfHn3TBp311O7vxnItOj0HHzjoj0IDh6Y4UW4Snx4od4LqFpw0ZE2senu4JCRuD3gf3zTc/K6fLL2MYVkVLO6wtLQ153cLd7bSMtx2LbaKdu83Kcqju4nY/D/xo/YGvcPhYiISKDRpVYgw8A1sqzjqmhdym9gVC5X2BLNUQk9WpCve6tcJZKkxPbnHo0/dmfrSvcKhqPmQ0vsKsnpbK+dA+UoQZ2XZb+01wuWBLjg7Q9x/uHAe3JPIT5AFdzUkck81IxsaTLcVKO3Xt48DfUqXKPjX5LA+qq5OlCR/PUXy9ECm0yWxNm2TgZoa33ihfrcUkh7A70//czCmY8CbGI5AA0lqq/YBVtq614QhT9lS3Ibn3PcaW0k58KZNIafj0je57bgofiIJVt6prgk0T6/Y/3/+cgL+nd5vV1PxBfp99U3096jCKV8flHyGNgoBHqWUIm1wD3aw4HN2EB1eSC0rkSjPOxLWsfCo3LoeKo8Uab9zBinEtxojD0nqXK3CUfBnQ2Gc73w5SxjcnoyolrRSfBjpuGjoIoZ57yIF9cauW1ogDvtScVcQUmvg2iZLK8g4F/NskUXo3Su0Z99rDDZxqMUUHIlMUYa0SIG/NlslhbZEJ2oIl5douj6W8zAHJmf/cjTCLhxOHROOrQztWmo5j7nOPlWwdXJnHfFRj+ByW1E61uZUyc3kBUkpPQ8I26TmA3tT4rG+Ih1mN/wM=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 3b771746..c43b2e4c 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -1,25 +1,37 @@ package broadcast import ( + "fmt" "sync" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) -const PluginName string = "broadcast" +const ( + PluginName string = "broadcast" + // driver is the mandatory field which should present in every storage + driver string = "driver" + + redis string = "redis" + memory string = "memory" +) type Plugin struct { sync.RWMutex - log logger.Logger + + cfg *Config + cfgPlugin config.Configurer + log logger.Logger // publishers implement Publisher interface // and able to receive a payload - publishers map[string]pubsub.Publisher + publishers map[string]pubsub.PubSub + providers map[string]pubsub.PSProvider } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { @@ -27,9 +39,95 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } + p.cfg = &Config{} + // unmarshal config section + err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) + if err != nil { + return errors.E(op, err) + } + + p.publishers = make(map[string]pubsub.PubSub) + p.providers = make(map[string]pubsub.PSProvider) - p.publishers = make(map[string]pubsub.Publisher) p.log = log + p.cfgPlugin = cfg + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("broadcast_plugin_serve") + errCh := make(chan error, 1) + + // iterate over config + for k, v := range p.cfg.Data { + if v == nil { + continue + } + + switch t := v.(type) { + // correct type + case map[string]interface{}: + if _, ok := t[driver]; !ok { + errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) + return errCh + } + default: + p.log.Warn("wrong type detected in the configuration, please, check yaml indentation") + continue + } + + // config key for the particular sub-driver kv.memcached + configKey := fmt.Sprintf("%s.%s", PluginName, k) + + switch v.(map[string]interface{})[driver] { + case memory: + if _, ok := p.providers[memory]; !ok { + p.log.Warn("no memory drivers registered", "registered", p.publishers) + continue + } + ps, err := p.providers[memory].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + case redis: + if _, ok := p.providers[redis]; !ok { + p.log.Warn("no redis drivers registered", "registered", p.publishers) + continue + } + + // first - try local configuration + switch { + case p.cfgPlugin.Has(configKey): + ps, err := p.providers[redis].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + case p.cfgPlugin.Has(redis): + ps, err := p.providers[redis].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + continue + } + } + } + + return errCh +} + +func (p *Plugin) Stop() error { return nil } @@ -40,8 +138,9 @@ func (p *Plugin) Collects() []interface{} { } // CollectPublishers collect all plugins who implement pubsub.Publisher interface -func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) { - p.publishers[name.Name()] = subscriber +func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) { + // key redis, value - interface + p.providers[name.Name()] = subscriber } // Publish is an entry point to the websocket PUBSUB @@ -88,21 +187,25 @@ func (p *Plugin) PublishAsync(m []byte) { // Get payload for i := 0; i < len(msg.GetTopics()); i++ { - if br, ok := p.publishers[msg.GetBroker()]; ok { - err := br.Publish(m) - if err != nil { - p.log.Error("publish async error", "error", err) + if len(p.publishers) > 0 { + for j := range p.publishers { + p.publishers[j].PublishAsync(m) } - } else { - p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker()) + return } + p.log.Warn("no publishers registered") } }() } -func (p *Plugin) GetDriver(key string) pubsub.SubReader { - println(key) - return nil +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { + const op = errors.Op("broadcast_plugin_get_driver") + // key - driver, default for example + // we should find `default` in the collected pubsubs providers + if pub, ok := p.publishers[key]; ok { + return pub, nil + } + return nil, errors.E(op, errors.Str("could not find driver by provided key")) } func (p *Plugin) RPC() interface{} { diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index efe92252..716e0d4c 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -85,6 +85,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit when user requests for example boltdb-south, we should provide that particular preconfigured storage */ for k, v := range p.cfg.Data { + // for example if the key not properly formatted (yaml) + if v == nil { + continue + } + if _, ok := v.(map[string]interface{})[driver]; !ok { errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) return errCh diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 6151ebf0..d4d535bf 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,7 +2,7 @@ package memory import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index 75cd9d24..02246a8f 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/spiral/roadrunner/v2/pkg/bst" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 24c21b55..8d997041 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -5,7 +5,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index dbda7ea4..c2a88abe 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -6,8 +6,8 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" @@ -62,6 +62,11 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, MasterName: ps.cfg.MasterName, }) + statusCmd := ps.universalClient.Ping(context.Background()) + if statusCmd.Err() != nil { + return nil, statusCmd.Err() + } + ps.fanin = newFanIn(ps.universalClient, log) ps.stop() diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go index b1d5d0a8..933a12e0 100644 --- a/plugins/websockets/config.go +++ b/plugins/websockets/config.go @@ -4,6 +4,7 @@ import ( "strings" "time" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pool" ) @@ -17,9 +18,9 @@ websockets: // Config represents configuration for the ws plugin type Config struct { // http path for the websocket - Path string `mapstructure:"path"` - + Path string `mapstructure:"path"` AllowedOrigin string `mapstructure:"allowed_origin"` + Broker string `mapstructure:"broker"` // wildcard origin allowedWOrigins []wildcard @@ -31,11 +32,16 @@ type Config struct { } // InitDefault initialize default values for the ws config -func (c *Config) InitDefault() { +func (c *Config) InitDefault() error { if c.Path == "" { c.Path = "/ws" } + // broker is mandatory + if c.Broker == "" { + return errors.Str("broker key should be specified") + } + if c.Pool == nil { c.Pool = &pool.Config{} if c.Pool.NumWorkers == 0 { @@ -64,7 +70,7 @@ func (c *Config) InitDefault() { if origin == "*" { // If "*" is present in the list, turn the whole list into a match all c.allowedAll = true - return + return nil } else if i := strings.IndexByte(origin, '*'); i >= 0 { // Split the origin in two: start and end string without the * w := wildcard{origin[0:i], origin[i+1:]} @@ -72,4 +78,6 @@ func (c *Config) InitDefault() { } else { c.allowedOrigins = append(c.allowedOrigins, origin) } + + return nil } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 07f22043..799312ad 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,8 +7,8 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" @@ -28,8 +28,8 @@ type Executor struct { // associated connection ID connID string - // map with the pubsub drivers - pubsub map[string]pubsub.Subscriber + // subscriber drivers + sub pubsub.Subscriber actualTopics map[string]struct{} req *http.Request @@ -38,12 +38,12 @@ type Executor struct { // NewExecutor creates protected connection and starts command loop func NewExecutor(conn *connection.Connection, log logger.Logger, - connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor { + connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor { return &Executor{ conn: conn, connID: connID, log: log, - pubsub: pubsubs, + sub: sub, accessValidator: av, actualTopics: make(map[string]struct{}, 10), req: r, @@ -126,11 +126,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit } // subscribe to the topic - if br, ok := e.pubsub[msg.Broker]; ok { - err = e.Set(br, msg.Topics) - if err != nil { - return errors.E(op, err) - } + err = e.Set(msg.Topics) + if err != nil { + return errors.E(op, err) } // handle leave @@ -155,11 +153,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit return errors.E(op, err) } - if br, ok := e.pubsub[msg.Broker]; ok { - err = e.Leave(br, msg.Topics) - if err != nil { - return errors.E(op, err) - } + err = e.Leave(msg.Topics) + if err != nil { + return errors.E(op, err) } case commands.Headers: @@ -170,13 +166,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit } } -func (e *Executor) Set(br pubsub.Subscriber, topics []string) error { +func (e *Executor) Set(topics []string) error { // associate connection with topics - err := br.Subscribe(e.connID, topics...) + err := e.sub.Subscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics - _ = br.Unsubscribe(e.connID, topics...) + _ = e.sub.Unsubscribe(e.connID, topics...) return err } @@ -188,9 +184,9 @@ func (e *Executor) Set(br pubsub.Subscriber, topics []string) error { return nil } -func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error { +func (e *Executor) Leave(topics []string) error { // remove associated connections from the storage - err := br.Unsubscribe(e.connID, topics...) + err := e.sub.Unsubscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) return err @@ -207,10 +203,7 @@ func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error { func (e *Executor) CleanUp() { // unsubscribe particular connection from the topics for topic := range e.actualTopics { - // here - for _, ps := range e.pubsub { - _ = ps.Unsubscribe(e.connID, topic) - } + _ = e.sub.Unsubscribe(e.connID, topic) } // clean up the actualTopics data diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go index e877fad3..ec6e1960 100644 --- a/plugins/websockets/origin_test.go +++ b/plugins/websockets/origin_test.go @@ -11,7 +11,8 @@ func TestConfig_Origin(t *testing.T) { AllowedOrigin: "*", } - cfg.InitDefault() + err := cfg.InitDefault() + assert.NoError(t, err) assert.True(t, isOriginAllowed("http://some.some.some.sssome", cfg)) assert.True(t, isOriginAllowed("http://", cfg)) @@ -29,7 +30,8 @@ func TestConfig_OriginWildCard(t *testing.T) { AllowedOrigin: "https://*my.site.com", } - cfg.InitDefault() + err := cfg.InitDefault() + assert.NoError(t, err) assert.True(t, isOriginAllowed("https://my.site.com", cfg)) assert.False(t, isOriginAllowed("http://", cfg)) @@ -50,7 +52,8 @@ func TestConfig_OriginWildCard2(t *testing.T) { AllowedOrigin: "https://my.*.com", } - cfg.InitDefault() + err := cfg.InitDefault() + assert.NoError(t, err) assert.True(t, isOriginAllowed("https://my.site.com", cfg)) assert.False(t, isOriginAllowed("http://", cfg)) diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index cf861c72..de7443fd 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -9,13 +9,12 @@ import ( "github.com/fasthttp/websocket" "github.com/google/uuid" json "github.com/json-iterator/go" - endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/interface/broadcast" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -33,16 +32,14 @@ const ( type Plugin struct { sync.RWMutex - // Collection with all available pubsubs - //pubsubs map[string]pubsub.PubSub - //psProviders map[string]pubsub.PSProvider + // subscriber+reader interfaces + subReader pubsub.SubReader + // broadcaster + broadcaster broadcast.Broadcaster - subReaders map[string]pubsub.SubReader - - cfg *Config - cfgPlugin config.Configurer - log logger.Logger + cfg *Config + log logger.Logger // global connections map connections sync.Map @@ -53,8 +50,10 @@ type Plugin struct { wsUpgrade *websocket.Upgrader serveExit chan struct{} + // workers pool phpPool phpPool.Pool - server server.Server + // server which produces commands to the pool + server server.Server // function used to validate access to the requested resource accessValidator validator.AccessValidatorFn @@ -71,14 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } - p.cfg.InitDefault() - //p.pubsubs = make(map[string]pubsub.PubSub) - //p.psProviders = make(map[string]pubsub.PSProvider) - - p.subReaders = make(map[string]pubsub.SubReader) - - p.log = log - p.cfgPlugin = cfg + err = p.cfg.InitDefault() + if err != nil { + return errors.E(op, err) + } p.wsUpgrade = &websocket.Upgrader{ HandshakeTimeout: time.Second * 60, @@ -90,19 +85,21 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } p.serveExit = make(chan struct{}) p.server = server - + p.log = log + p.broadcaster = b return nil } func (p *Plugin) Serve() chan error { - errCh := make(chan error, 1) const op = errors.Op("websockets_plugin_serve") - - //err := p.initPubSubs() - //if err != nil { - // errCh <- errors.E(op, err) - // return errCh - //} + errCh := make(chan error, 1) + // init broadcaster + var err error + p.subReader, err = p.broadcaster.GetDriver(p.cfg.Broker) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } go func() { var err error @@ -124,78 +121,28 @@ func (p *Plugin) Serve() chan error { p.accessValidator = p.defaultAccessValidator(p.phpPool) }() - p.workersPool = pool.NewWorkersPool(p.subReaders, &p.connections, p.log) + p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log) // run all pubsubs drivers - for _, v := range p.subReaders { - go func(ps pubsub.SubReader) { - for { - select { - case <-p.serveExit: + go func(ps pubsub.Reader) { + for { + select { + case <-p.serveExit: + return + default: + data, err := ps.Next() + if err != nil { + errCh <- err return - default: - data, err := ps.Next() - if err != nil { - errCh <- err - return - } - p.workersPool.Queue(data) } + p.workersPool.Queue(data) } - }(v) - } + } + }(p.subReader) return errCh } -//func (p *Plugin) initPubSubs() error { -// for i := 0; i < len(p.cfg.PubSubs); i++ { -// // don't need to have a section for the in-memory -// if p.cfg.PubSubs[i] == "memory" { -// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { -// r, err := provider.PSProvide("") -// if err != nil { -// return err -// } -// -// // append default in-memory provider -// p.pubsubs["memory"] = r -// } -// continue -// } -// // key - memory, redis -// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { -// // try local key -// switch { -// // try local config first -// case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])): -// r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])) -// if err != nil { -// return err -// } -// -// // append redis provider -// p.pubsubs[p.cfg.PubSubs[i]] = r -// case p.cfgPlugin.Has(p.cfg.PubSubs[i]): -// r, err := provider.PSProvide(p.cfg.PubSubs[i]) -// if err != nil { -// return err -// } -// -// // append redis provider -// p.pubsubs[p.cfg.PubSubs[i]] = r -// default: -// return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i]) -// } -// } else { -// // no such driver -// p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders) -// } -// } -// -// return nil -//} - func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() @@ -210,23 +157,12 @@ func (p *Plugin) Stop() error { return nil } -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.GetSubsReader, - } -} - func (p *Plugin) Available() {} func (p *Plugin) Name() string { return PluginName } -// GetSubsReader collects all plugins which implement SubReader interface -func (p *Plugin) GetSubsReader(name endure.Named, pub pubsub.SubReader) { - p.subReaders[name.Name()] = pub -} - func (p *Plugin) Middleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != p.cfg.Path { @@ -272,7 +208,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { p.connections.Store(connectionID, safeConn) // Executor wraps a connection to have a safe abstraction - e := executor.NewExecutor(safeConn, p.log, connectionID, nil, p.accessValidator, r) + e := executor.NewExecutor(safeConn, p.log, connectionID, p.subReader, p.accessValidator, r) p.log.Info("websocket client connected", "uuid", connectionID) err = e.StartCommandLoop() diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 22042d8d..cd9444da 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,15 +4,15 @@ import ( "sync" json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { - storage map[string]pubsub.SubReader + subscriber pubsub.Subscriber connections *sync.Map resPool sync.Pool log logger.Logger @@ -22,11 +22,11 @@ type WorkersPool struct { } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, queue: make(chan *websocketsv1.Message, 100), - storage: pubsubs, + subscriber: subscriber, log: log, exit: make(chan struct{}), } @@ -90,19 +90,13 @@ func (wp *WorkersPool) do() { //nolint:gocognit continue } - br, ok := wp.storage[msg.Broker] - if !ok { - wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) - continue - } - // send a message to every topic for i := 0; i < len(msg.GetTopics()); i++ { // get free map res := wp.get() // get connections for the particular topic - br.Connections(msg.GetTopics()[i], res) + wp.subscriber.Connections(msg.GetTopics()[i], res) if len(res) == 0 { wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) @@ -114,7 +108,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit for topic := range res { c, ok := wp.connections.Load(topic) if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) + wp.log.Warn("the user disconnected connection before the message being written to it", "topics", msg.GetTopics()[i]) wp.put(res) continue } @@ -135,7 +129,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit err = c.(*connection.Connection).Write(d) if err != nil { for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) + wp.log.Error("error sending payload over the connection", "error", err, "topics", msg.GetTopics()[i]) } wp.put(res) continue diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index 65ee4415..5b195bd0 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -98,8 +98,7 @@ func TestBroadcastInit(t *testing.T) { } }() - time.Sleep(time.Second * 1) - //t.Run("TestWSInit", wsInit) + t.Run("TestWSInit", wsInit) stopCh <- struct{}{} @@ -186,7 +185,7 @@ func messageWS(command string, broker string, payload []byte, topics ...string) return &websocketsv1.Message{ Topics: topics, Command: command, - Broker: broker, + //Broker: broker, Payload: payload, } } @@ -197,7 +196,7 @@ func makeMessage(command string, broker string, payload []byte, topics ...string { Topics: topics, Command: command, - Broker: broker, + //Broker: broker, Payload: payload, }, }, diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml index fa4116d0..6962eeb5 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml @@ -23,19 +23,21 @@ redis: addrs: - "localhost:6379" + broadcast: - default: - driver: redis - test: - driver: memory + default: + driver: redis + addrs: + - "localhost:6379" websockets: - pubsubs: [ "redis" ] + broker: default + allowed_origin: "*" path: "/ws" logs: mode: development - level: error + level: debug endure: grace_period: 120s diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml index dc073be3..b6882d84 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -23,12 +23,18 @@ redis: addrs: - "localhost:6379" +broadcast: + default: + driver: redis + addrs: + - "localhost:6379" + websockets: - # pubsubs should implement PubSub interface to be collected via endure.Collects - # pubsubs might use general config section or its own - pubsubs: [ "redis" ] + broker: default + allowed_origin: "*" path: "/ws" + logs: mode: development level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml index 896cee05..f81e13e3 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-allow.yaml @@ -23,8 +23,13 @@ redis: addrs: - "localhost:6379" +broadcast: + test: + driver: memory + websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml index e3bf5218..decb7dcf 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-deny.yaml @@ -19,12 +19,13 @@ http: allocate_timeout: 60s destroy_timeout: 60s -redis: - addrs: - - "localhost:6379" +broadcast: + test: + driver: memory websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml index 0614f4e7..5377aef2 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-memory-stop.yaml @@ -19,12 +19,13 @@ http: allocate_timeout: 60s destroy_timeout: 60s -redis: - addrs: - - "localhost:6379" +broadcast: + test: + driver: memory websockets: - pubsubs: [ "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml index 27eab557..a077bf9e 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml @@ -20,17 +20,18 @@ http: destroy_timeout: 60s -websockets: - pubsubs: [ "redis", "memory" ] - redis: - addrs: - - "localhost:6379" +broadcast: + test: + driver: memory +websockets: + broker: test + allowed_origin: "*" path: "/ws" logs: mode: development - level: error + level: debug endure: grace_period: 120s diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml index fd125794..d80993f2 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml @@ -21,7 +21,8 @@ http: websockets: - pubsubs: [ "redis", "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml index eedf5377..3557f5f1 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-redis-memory.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-redis.yaml @@ -23,10 +23,13 @@ redis: addrs: - "localhost:6379" +broadcast: + test: + driver: redis + websockets: - # pubsubs should implement PubSub interface to be collected via endure.Collects - # pubsubs might use general config section - pubsubs: [ "redis", "memory" ] + broker: test + allowed_origin: "*" path: "/ws" logs: diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 8321297d..cb78117f 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -17,6 +17,7 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -47,6 +48,7 @@ func TestBroadcastInit(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -98,6 +100,8 @@ func TestBroadcastInit(t *testing.T) { time.Sleep(time.Second * 1) t.Run("TestWSInit", wsInit) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) + t.Run("RPCWsMemory", RPCWsMemory) stopCh <- struct{}{} @@ -119,7 +123,7 @@ func wsInit(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -156,6 +160,7 @@ func TestWSRedisAndMemory(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -219,7 +224,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) { assert.NoError(t, err) cfg := &config.Viper{ - Path: "configs/.rr-websockets-redis-memory.yaml", + Path: "configs/.rr-websockets-redis.yaml", Prefix: "rr", } @@ -232,6 +237,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -281,8 +287,7 @@ func TestWSRedisAndMemoryGlobal(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) - t.Run("RPCWsMemory", RPCWsMemory) + t.Run("RPCWsRedis", RPCWsRedis) stopCh <- struct{}{} @@ -308,6 +313,7 @@ func TestWSRedisNoSection(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -326,7 +332,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) @@ -335,7 +341,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -359,7 +365,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -386,7 +392,7 @@ func RPCWsMemoryPubAsync(t *testing.T) { _, msg, err = c.ReadMessage() retMsg = utils.AsString(msg) assert.NoError(t, err) - assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP2\"}", retMsg) + assert.Equal(t, "{\"topic\":\"foo2\",\"payload\":\"hello, PHP\"}", retMsg) err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) assert.NoError(t, err) @@ -398,7 +404,7 @@ func RPCWsMemory(t *testing.T) { HandshakeTimeout: time.Second * 20, } - connURL := url.URL{Scheme: "ws", Host: "localhost:13235", Path: "/ws"} + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} c, resp, err := da.Dial(connURL.String(), nil) assert.NoError(t, err) @@ -409,7 +415,7 @@ func RPCWsMemory(t *testing.T) { } }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -433,7 +439,7 @@ func RPCWsMemory(t *testing.T) { assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -481,7 +487,7 @@ func RPCWsRedis(t *testing.T) { _ = resp.Body.Close() }() - d, err := json.Marshal(messageWS("join", "redis", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -505,7 +511,7 @@ func RPCWsRedis(t *testing.T) { assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "redis", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -556,6 +562,7 @@ func TestWSMemoryDeny(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -631,7 +638,7 @@ func RPCWsMemoryDeny(t *testing.T) { } }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -647,7 +654,7 @@ func RPCWsMemoryDeny(t *testing.T) { assert.Equal(t, `{"topic":"#join","payload":["foo","foo2"]}`, retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -684,6 +691,7 @@ func TestWSMemoryStop(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -777,6 +785,7 @@ func TestWSMemoryOk(t *testing.T) { &websockets.Plugin{}, &httpPlugin.Plugin{}, &memory.Plugin{}, + &broadcast.Plugin{}, ) assert.NoError(t, err) @@ -852,7 +861,7 @@ func RPCWsMemoryAllow(t *testing.T) { } }() - d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + d, err := json.Marshal(messageWS("join", []byte("hello websockets"), "foo", "foo2")) if err != nil { panic(err) } @@ -876,7 +885,7 @@ func RPCWsMemoryAllow(t *testing.T) { assert.Equal(t, "{\"topic\":\"foo\",\"payload\":\"hello, PHP\"}", retMsg) // //// LEAVE foo, foo2 ///////// - d, err = json.Marshal(messageWS("leave", "memory", []byte("hello websockets"), "foo")) + d, err = json.Marshal(messageWS("leave", []byte("hello websockets"), "foo")) if err != nil { panic(err) } @@ -909,7 +918,7 @@ func RPCWsMemoryAllow(t *testing.T) { assert.NoError(t, err) } -func publish(command string, broker string, topics ...string) { +func publish(command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -918,13 +927,13 @@ func publish(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) + err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP"), topics...), ret) if err != nil { panic(err) } } -func publishAsync(t *testing.T, command string, broker string, topics ...string) { +func publishAsync(t *testing.T, command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -933,12 +942,12 @@ func publishAsync(t *testing.T, command string, broker string, topics ...string) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) + err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func publishAsync2(t *testing.T, command string, broker string, topics ...string) { +func publishAsync2(t *testing.T, command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -947,12 +956,12 @@ func publishAsync2(t *testing.T, command string, broker string, topics ...string client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) + err = client.Call("broadcast.PublishAsync", makeMessage(command, []byte("hello, PHP2"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func publish2(t *testing.T, command string, broker string, topics ...string) { +func publish2(t *testing.T, command string, topics ...string) { conn, err := net.Dial("tcp", "127.0.0.1:6001") if err != nil { panic(err) @@ -961,27 +970,25 @@ func publish2(t *testing.T, command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) ret := &websocketsv1.Response{} - err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) + err = client.Call("broadcast.Publish", makeMessage(command, []byte("hello, PHP2"), topics...), ret) assert.NoError(t, err) assert.True(t, ret.Ok) } -func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { +func messageWS(command string, payload []byte, topics ...string) *websocketsv1.Message { return &websocketsv1.Message{ Topics: topics, Command: command, - Broker: broker, Payload: payload, } } -func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request { +func makeMessage(command string, payload []byte, topics ...string) *websocketsv1.Request { m := &websocketsv1.Request{ Messages: []*websocketsv1.Message{ { Topics: topics, Command: command, - Broker: broker, Payload: payload, }, }, |