summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
committerValery Piashchynski <[email protected]>2021-05-27 00:09:33 +0300
commitdc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch)
tree6ba562da6de7f32a8d528b72cbb56a8bc98c1b30
parentd2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff)
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub - New plugin - websockets Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/pool/interface.go2
-rw-r--r--pkg/pubsub/interface.go38
-rw-r--r--pkg/pubsub/message.go64
-rw-r--r--plugins/broadcast/config.go29
-rw-r--r--plugins/broadcast/doc/.rr-broadcast.yaml29
-rw-r--r--plugins/broadcast/doc/broadcast.drawio151
-rw-r--r--plugins/broadcast/interface.go41
-rw-r--r--plugins/broadcast/memory/driver.go29
-rw-r--r--plugins/broadcast/plugin.go105
-rw-r--r--plugins/broadcast/redis/driver.go29
-rw-r--r--plugins/broadcast/redis/plugin.go1
-rw-r--r--plugins/broadcast/redis/storage.go1
-rw-r--r--plugins/broadcast/rpc.go26
-rw-r--r--plugins/broadcast/ws/commands/join.go10
-rw-r--r--plugins/broadcast/ws/commands/leave.go1
-rw-r--r--plugins/broadcast/ws/commands/subscribe.go1
-rw-r--r--plugins/broadcast/ws/config.go26
-rw-r--r--plugins/broadcast/ws/plugin.go59
-rw-r--r--plugins/broadcast/ws/subscriber.go50
-rw-r--r--plugins/broadcast/ws/ws_middleware.go13
-rw-r--r--plugins/kv/drivers/redis/plugin.go2
-rw-r--r--plugins/memory/bst/bst.go (renamed from plugins/broadcast/memory/bst/bst.go)26
-rw-r--r--plugins/memory/bst/bst_test.go (renamed from plugins/broadcast/memory/bst/bst_test.go)4
-rw-r--r--plugins/memory/bst/interface.go (renamed from plugins/broadcast/memory/bst/interface.go)0
-rw-r--r--plugins/memory/config.go (renamed from plugins/broadcast/memory/config.go)4
-rw-r--r--plugins/memory/driver.go28
-rw-r--r--plugins/memory/plugin.go (renamed from plugins/broadcast/memory/plugin.go)42
-rw-r--r--plugins/redis/fanin.go100
-rw-r--r--plugins/redis/plugin.go135
-rw-r--r--plugins/websockets/commands/enums.go9
-rw-r--r--plugins/websockets/config.go67
-rw-r--r--plugins/websockets/connection/connection.go (renamed from plugins/broadcast/ws/connection/connection.go)6
-rw-r--r--plugins/websockets/doc/broadcast.drawio1
-rw-r--r--plugins/websockets/doc/broadcast_arch.drawio1
-rw-r--r--plugins/websockets/doc/ws.drawio (renamed from plugins/broadcast/doc/ws.drawio)0
-rw-r--r--plugins/websockets/executor/executor.go146
-rw-r--r--plugins/websockets/plugin.go206
-rw-r--r--plugins/websockets/pool/workers_pool.go104
-rw-r--r--plugins/websockets/rpc.go46
-rw-r--r--plugins/websockets/storage/storage.go50
-rw-r--r--plugins/websockets/validator/access_validator.go102
-rw-r--r--plugins/websockets/validator/access_validator_test.go35
-rw-r--r--tests/Dockerfile0
-rw-r--r--tests/docker-compose.yaml3
-rw-r--r--tests/plugins/informer/.rr-informer.yaml6
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml50
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go102
47 files changed, 1302 insertions, 678 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index 4ef2f2e7..c22fbbd3 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -18,7 +18,7 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
- // Remove worker from the pool.
+ // RemoveWorker removes worker from the pool.
RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
new file mode 100644
index 00000000..80dab0c3
--- /dev/null
+++ b/pkg/pubsub/interface.go
@@ -0,0 +1,38 @@
+package pubsub
+
+// PubSub ...
+type PubSub interface {
+ Publisher
+ Subscriber
+ Reader
+}
+
+// Subscriber defines the ability to operate as message passing broker.
+type Subscriber interface {
+ // Subscribe broker to one or multiple topics.
+ Subscribe(topics ...string) error
+ // Unsubscribe from one or multiply topics
+ Unsubscribe(topics ...string) error
+}
+
+// Publisher publish one or more messages
+type Publisher interface {
+ // Publish one or multiple Channel.
+ Publish(messages []Message) error
+
+ // PublishAsync publish message and return immediately
+ // If error occurred it will be printed into the logger
+ PublishAsync(messages []Message)
+}
+
+// Reader interface should return next message
+type Reader interface {
+ Next() (Message, error)
+}
+
+type Message interface {
+ Command() string
+ Payload() []byte
+ Topics() []string
+ Broker() string
+}
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
new file mode 100644
index 00000000..ab74eb98
--- /dev/null
+++ b/pkg/pubsub/message.go
@@ -0,0 +1,64 @@
+package pubsub
+
+import (
+ json "github.com/json-iterator/go"
+)
+
+type Msg struct {
+ // Topic message been pushed into.
+ T []string `json:"topic"`
+
+ // Command (join, leave, headers)
+ C string `json:"command"`
+
+ // Broker (redis, memory)
+ B string `json:"broker"`
+
+ // Payload to be broadcasted
+ P []byte `json:"payload"`
+}
+
+//func (m Msg) UnmarshalBinary(data []byte) error {
+// //Use default gob decoder
+// reader := bytes.NewReader(data)
+// dec := gob.NewDecoder(reader)
+// if err := dec.Decode(&m); err != nil {
+// return err
+// }
+//
+// return nil
+//}
+
+func (m *Msg) MarshalBinary() ([]byte, error) {
+ //buf := new(bytes.Buffer)
+ //
+ //for i := 0; i < len(m.T); i++ {
+ // buf.WriteString(m.T[i])
+ //}
+ //
+ //buf.WriteString(m.C)
+ //buf.WriteString(m.B)
+ //buf.Write(m.P)
+
+ return json.Marshal(m)
+
+}
+
+// Payload in raw bytes
+func (m *Msg) Payload() []byte {
+ return m.P
+}
+
+// Command for the connection
+func (m *Msg) Command() string {
+ return m.C
+}
+
+// Topics to subscribe
+func (m *Msg) Topics() []string {
+ return m.T
+}
+
+func (m *Msg) Broker() string {
+ return m.B
+}
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
deleted file mode 100644
index 5e7b7f20..00000000
--- a/plugins/broadcast/config.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package broadcast
-
-/*
-broadcast:
- ws-us-region-1:
- subscriber: websockets
- middleware: ["headers", "gzip"] # ????
- address: "localhost:53223"
- path: "/ws"
-
- storage: redis
- address:
- - 6379
- db: 0
-*/
-
-// Config represents configuration for the ws plugin
-type Config struct {
- // Sections represent particular broadcast plugin section
- Sections map[string]interface{} `mapstructure:"sections"`
-}
-
-func (c *Config) InitDefaults() {
-
-}
-
-func (c *Config) Valid() error {
- return nil
-}
diff --git a/plugins/broadcast/doc/.rr-broadcast.yaml b/plugins/broadcast/doc/.rr-broadcast.yaml
deleted file mode 100644
index 8b0eef20..00000000
--- a/plugins/broadcast/doc/.rr-broadcast.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-broadcast:
- # path to enable web-socket handler middleware
- path: /ws
-
- # optional, redis broker configuration
- redis:
- addrs:
- - "localhost:6379"
- # if a MasterName is passed a sentinel-backed FailoverClient will be returned
- master_name: ""
- username: ""
- password: ""
- db: 0
- sentinel_password: ""
- route_by_latency: false
- route_randomly: false
- dial_timeout: 0 # accepted values [1s, 5m, 3h]
- max_retries: 1
- min_retry_backoff: 0 # accepted values [1s, 5m, 3h]
- max_retry_backoff: 0 # accepted values [1s, 5m, 3h]
- pool_size: 0
- min_idle_conns: 0
- max_conn_age: 0 # accepted values [1s, 5m, 3h]
- read_timeout: 0 # accepted values [1s, 5m, 3h]
- write_timeout: 0 # accepted values [1s, 5m, 3h]
- pool_timeout: 0 # accepted values [1s, 5m, 3h]
- idle_timeout: 0 # accepted values [1s, 5m, 3h]
- idle_check_freq: 0 # accepted values [1s, 5m, 3h]
- read_only: false
diff --git a/plugins/broadcast/doc/broadcast.drawio b/plugins/broadcast/doc/broadcast.drawio
deleted file mode 100644
index 2339f5b1..00000000
--- a/plugins/broadcast/doc/broadcast.drawio
+++ /dev/null
@@ -1,151 +0,0 @@
-<mxfile>
- <diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">
- <mxGraphModel dx="1582" dy="1094" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="1920" pageHeight="1200" math="0" shadow="0">
- <root>
- <mxCell id="0"/>
- <mxCell id="1" parent="0"/>
- <mxCell id="y4MLTYBancT3lkQri0nA-9" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry">
- <mxPoint x="620" y="440" as="targetPoint"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-15" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="y4MLTYBancT3lkQri0nA-2" edge="1">
- <mxGeometry relative="1" as="geometry">
- <Array as="points">
- <mxPoint x="460" y="240"/>
- <mxPoint x="270" y="240"/>
- </Array>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-16" value="4" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="34_DfmtK1x9xla_BCLYV-17" edge="1">
- <mxGeometry x="0.0526" y="10" relative="1" as="geometry">
- <mxPoint x="459.72413793103465" y="510" as="targetPoint"/>
- <mxPoint x="-10" y="10" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-1" value="BROADCAST HUB (PLUGIN)" style="whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="430" y="410" width="120" height="60" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-5" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-19" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="34_DfmtK1x9xla_BCLYV-1" edge="1">
- <mxGeometry relative="1" as="geometry">
- <Array as="points">
- <mxPoint x="240" y="160"/>
- <mxPoint x="696" y="160"/>
- </Array>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-22" value="6" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-19" vertex="1" connectable="0">
- <mxGeometry x="-0.2172" relative="1" as="geometry">
- <mxPoint x="108.62" y="-10" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-2" value="WS" style="whiteSpace=wrap;html=1;fillColor=#f5f5f5;strokeColor=#666666;fontColor=#333333;" parent="1" vertex="1">
- <mxGeometry x="210" y="290" width="120" height="40" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-23" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" edge="1">
- <mxGeometry relative="1" as="geometry">
- <mxPoint x="50" y="440.2068965517242" as="targetPoint"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-3" value="REDIS" style="whiteSpace=wrap;html=1;fillColor=#fff2cc;strokeColor=#d6b656;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="210" y="420" width="120" height="40" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-7" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-4" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-4" value="IN-MEMORY" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="210" y="560" width="120" height="40" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-8" value="Collects sub-plugins" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1">
- <mxGeometry x="310" y="370" width="120" height="20" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-15" value="3" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.75;exitDx=0;exitDy=0;entryX=1;entryY=0.75;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-10" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry x="0.1429" y="15" relative="1" as="geometry">
- <mxPoint as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-10" value="RPC BUS" style="whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=#9673a6;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="620" y="410" width="120" height="60" as="geometry"/>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;entryX=0.75;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="CDYlmZ7dxupAKddLQDts-1" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-8" value="Connect to the GO endpoint (from the config)" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="CDYlmZ7dxupAKddLQDts-6" vertex="1" connectable="0">
- <mxGeometry x="-0.6679" y="-2" relative="1" as="geometry">
- <mxPoint x="-33.3" y="-13" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-1" value="" style="aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/azure2/general/Browser.svg;" parent="1" vertex="1">
- <mxGeometry x="652.5" y="30" width="87.5" height="70" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-12" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="y4MLTYBancT3lkQri0nA-10" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-20" value="2" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-12" vertex="1" connectable="0">
- <mxGeometry x="0.2355" y="-1" relative="1" as="geometry">
- <mxPoint x="9" y="12.69" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-3" value="PUBLISH" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
- <mxGeometry x="636.25" y="568" width="87.5" height="30" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-11" value="1" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-5" target="CDYlmZ7dxupAKddLQDts-3" edge="1">
- <mxGeometry x="0.2" y="-10" relative="1" as="geometry">
- <Array as="points">
- <mxPoint x="680" y="620"/>
- <mxPoint x="680" y="620"/>
- </Array>
- <mxPoint as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-5" value="PHP" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
- <mxGeometry x="620" y="628" width="120" height="30" as="geometry"/>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-9" value="Save the websocket connection" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1">
- <mxGeometry x="280" y="220" width="180" height="20" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-1" value="BROKER?? -&amp;gt; Subscriber" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
- <mxGeometry x="653" y="100" width="87" height="30" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-9" value="&lt;p style=&quot;margin: 0px ; margin-top: 4px ; text-align: center ; text-decoration: underline&quot;&gt;&lt;b&gt;Structure&lt;/b&gt;&lt;/p&gt;&lt;hr&gt;&lt;p style=&quot;margin: 0px ; margin-left: 8px&quot;&gt;Topic = string&lt;br&gt;Payload = raw&lt;br&gt;&lt;/p&gt;&lt;p style=&quot;margin: 0px ; margin-left: 8px&quot;&gt;Broker = ws (from config) (hardcoded)&lt;br&gt;&lt;/p&gt;" style="verticalAlign=top;align=left;overflow=fill;fontSize=12;fontFamily=Helvetica;html=1;" parent="1" vertex="1">
- <mxGeometry x="780" y="543" width="220" height="80" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-10" value="" style="endArrow=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="34_DfmtK1x9xla_BCLYV-9" edge="1">
- <mxGeometry width="50" height="50" relative="1" as="geometry">
- <mxPoint x="740" y="608" as="sourcePoint"/>
- <mxPoint x="790" y="558" as="targetPoint"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-14" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-13" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-13" value="AMQP" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="210" y="658" width="120" height="40" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-18" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-17" target="y4MLTYBancT3lkQri0nA-2" edge="1">
- <mxGeometry relative="1" as="geometry">
- <Array as="points">
- <mxPoint x="160" y="520"/>
- <mxPoint x="160" y="380"/>
- <mxPoint x="270" y="380"/>
- </Array>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-21" value="5" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-18" vertex="1" connectable="0">
- <mxGeometry x="-0.317" y="3" relative="1" as="geometry">
- <mxPoint x="-7" y="-10.34" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-17" value="&lt;ul&gt;&lt;li&gt;Check registered topics&lt;/li&gt;&lt;li&gt;Redirect to driver&lt;/li&gt;&lt;/ul&gt;" style="whiteSpace=wrap;html=1;align=left;" parent="1" vertex="1">
- <mxGeometry x="240" y="500" width="175" height="40" as="geometry"/>
- </mxCell>
- </root>
- </mxGraphModel>
- </diagram>
-</mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
deleted file mode 100644
index 47c779b5..00000000
--- a/plugins/broadcast/interface.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package broadcast
-
-import (
- "encoding/json"
-)
-
-// Subscriber defines the ability to operate as message passing broker.
-type Subscriber interface {
- // Subscribe broker to one or multiple topics.
- Subscribe(topics ...string) error
- // UnsubscribePattern broker from pattern.
- UnsubscribePattern(pattern string) error
-}
-
-// Storage used to store patterns and topics
-type Storage interface {
- // Store connection uuid associated with the provided topics
- Store(uuid string, topics ...string)
- // StorePattern stores pattern associated with the particular connection
- StorePattern(uuid string, pattern string)
-
- // GetConnection returns connections for the particular pattern
- GetConnection(pattern string) []string
-
- // Construct is a constructor for the storage according to the provided configuration key (broadcast.websocket for example)
- Construct(key string) (Storage, error)
-}
-
-type Publisher interface {
- // Publish one or multiple Channel.
- Publish(messages ...*Message) error
-}
-
-// Message represent single message.
-type Message struct {
- // Topic message been pushed into.
- Topic string `json:"topic"`
-
- // Payload to be broadcasted. Must be valid json when transferred over RPC.
- Payload json.RawMessage `json:"payload"`
-}
diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go
deleted file mode 100644
index 80527e4b..00000000
--- a/plugins/broadcast/memory/driver.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package memory
-
-import (
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
-)
-
-type Driver struct {
-}
-
-func NewInMemoryDriver() broadcast.Storage {
- b := &Driver{}
- return b
-}
-
-func (d *Driver) Store(uuid string, topics ...string) {
- panic("implement me")
-}
-
-func (d *Driver) StorePattern(uuid string, pattern string) {
- panic("implement me")
-}
-
-func (d *Driver) GetConnection(pattern string) []string {
- panic("implement me")
-}
-
-func (d *Driver) Construct(key string) (broadcast.Storage, error) {
- return nil, nil
-}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
deleted file mode 100644
index 156bea80..00000000
--- a/plugins/broadcast/plugin.go
+++ /dev/null
@@ -1,105 +0,0 @@
-package broadcast
-
-import (
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "broadcast"
-)
-
-type Plugin struct {
- broker Subscriber
- driver Storage
-
- log logger.Logger
- cfg *Config
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("broadcast_plugin_init")
-
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, errors.Disabled, err)
- }
-
- p.cfg.InitDefaults()
-
- p.log = log
- return nil
-}
-
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("broadcast_plugin_serve")
- errCh := make(chan error)
-
- // if there are no brokers, return nil
- if p.broker == nil {
- errCh <- errors.E(op, errors.Str("no broker detected"))
- return errCh
- }
-
- if p.driver == nil {
- // Or if no storage detected, use in-memory storage
- errCh <- errors.E(op, errors.Str("no storage detected"))
- return errCh
- }
-
- // start the underlying broker
- go func() {
- // err := p.broker.Serve()
- // if err != nil {
- // errCh <- errors.E(op, err)
- // }
- }()
-
- return errCh
-}
-
-func (p *Plugin) Stop() error {
- return nil
-}
-
-// Available interface implementation for the plugin
-func (p *Plugin) Available() {}
-
-// Name is endure.Named interface implementation
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.CollectSubscriber,
- }
-}
-
-func (p *Plugin) CollectSubscriber(name endure.Named, subscriber Subscriber) {
- p.broker = subscriber
-}
-
-func (p *Plugin) CollectStorage(name endure.Named, storage Storage) {
- p.driver = storage
-}
-
-func (p *Plugin) RPC() interface{} {
- // create an RPC service for the collects
- r := &rpc{
- log: p.log,
- svc: p,
- }
- return r
-}
-
-func (p *Plugin) Publish(msg []*Message) error {
- const op = errors.Op("broadcast_plugin_publish")
- return nil
-}
diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go
deleted file mode 100644
index 556d5f03..00000000
--- a/plugins/broadcast/redis/driver.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package redis
-
-import (
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
-)
-
-type Driver struct {
-}
-
-func NewInMemoryDriver() broadcast.Storage {
- b := &Driver{}
- return b
-}
-
-func (d *Driver) Store(uuid string, topics ...string) {
- panic("implement me")
-}
-
-func (d *Driver) StorePattern(uuid string, pattern string) {
- panic("implement me")
-}
-
-func (d *Driver) GetConnection(pattern string) []string {
- panic("implement me")
-}
-
-func (d *Driver) Construct(key string) (broadcast.Storage, error) {
- panic("implement me")
-}
diff --git a/plugins/broadcast/redis/plugin.go b/plugins/broadcast/redis/plugin.go
deleted file mode 100644
index 65a229e1..00000000
--- a/plugins/broadcast/redis/plugin.go
+++ /dev/null
@@ -1 +0,0 @@
-package redis
diff --git a/plugins/broadcast/redis/storage.go b/plugins/broadcast/redis/storage.go
deleted file mode 100644
index 65a229e1..00000000
--- a/plugins/broadcast/redis/storage.go
+++ /dev/null
@@ -1 +0,0 @@
-package redis
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
deleted file mode 100644
index 948fd7ae..00000000
--- a/plugins/broadcast/rpc.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package broadcast
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type rpc struct {
- log logger.Logger
- svc *Plugin
-}
-
-func (r *rpc) Publish(msg []*Message, ok *bool) error {
- const op = errors.Op("broadcast_publish")
- err := r.svc.Publish(msg)
- if err != nil {
- *ok = false
- return errors.E(op, err)
- }
- *ok = true
- return nil
-}
-
-func (r *rpc) PublishAsync() {
-
-}
diff --git a/plugins/broadcast/ws/commands/join.go b/plugins/broadcast/ws/commands/join.go
deleted file mode 100644
index 25943f0a..00000000
--- a/plugins/broadcast/ws/commands/join.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package commands
-
-// Join command to save the connection
-type Join struct {
- Command string `mapstructure:"command"`
-}
-
-func JoinCommand() {
-
-}
diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go
deleted file mode 100644
index cdff10da..00000000
--- a/plugins/broadcast/ws/commands/leave.go
+++ /dev/null
@@ -1 +0,0 @@
-package commands
diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go
deleted file mode 100644
index cdff10da..00000000
--- a/plugins/broadcast/ws/commands/subscribe.go
+++ /dev/null
@@ -1 +0,0 @@
-package commands
diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go
deleted file mode 100644
index 1d4132b4..00000000
--- a/plugins/broadcast/ws/config.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package ws
-
-/*
-broadcast:
- ws-us-region-1:
- subscriber: ws
- path: "/ws"
-
- driver: redis
- address:
- - 6379
- db: 0
-*/
-
-// Config represents configuration for the ws plugin
-type Config struct {
- // http path for the websocket
- Path string `mapstructure:"Path"`
-}
-
-// InitDefault initialize default values for the ws config
-func (c *Config) InitDefault() {
- if c.Path == "" {
- c.Path = "/ws"
- }
-}
diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go
deleted file mode 100644
index f075864b..00000000
--- a/plugins/broadcast/ws/plugin.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package ws
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- RootPluginName = "broadcast"
- PluginName = "websockets"
-)
-
-type Plugin struct {
- // logger
- log logger.Logger
- // configurer plugin
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("ws_plugin_init")
-
- // check for the configuration section existence
- if !cfg.Has(RootPluginName) {
- return errors.E(op, errors.Disabled, errors.Str("broadcast plugin section should exists in the configuration"))
- }
-
- p.cfg = cfg
- p.log = log
-
- return nil
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-// Provides Provide a ws implementation
-func (p *Plugin) Provides() []interface{} {
- return []interface{}{
- p.Websocket,
- }
-}
-
-// Websocket method should provide the Subscriber implementation to the broadcast
-func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) {
- const op = errors.Op("websocket_subscriber_provide")
- // initialize subscriber with the storage
- ws, err := NewWSSubscriber(storage)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return ws, nil
-}
-
-func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go
deleted file mode 100644
index 660efdca..00000000
--- a/plugins/broadcast/ws/subscriber.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package ws
-
-import (
- "github.com/gofiber/fiber/v2"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection"
-)
-
-type Subscriber struct {
- connections map[string]*connection.Connection
- storage broadcast.Storage
-}
-
-// config
-//
-func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) {
- m := make(map[string]*connection.Connection)
-
- go func() {
- app := fiber.New()
- app.Use("/ws", wsMiddleware)
- app.Listen(":8080")
- }()
-
- return &Subscriber{
- connections: m,
- storage: storage,
- }, nil
-}
-
-func (s *Subscriber) Subscribe(topics ...string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) SubscribePattern(pattern string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) Unsubscribe(topics ...string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) UnsubscribePattern(pattern string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) Publish(messages ...*broadcast.Message) error {
- s.storage.GetConnection(messages[9].Topic)
- return nil
-}
diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go
deleted file mode 100644
index 068ef9fb..00000000
--- a/plugins/broadcast/ws/ws_middleware.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package ws
-
-import (
- "github.com/gofiber/fiber/v2"
- "github.com/gofiber/websocket/v2"
-)
-
-func wsMiddleware(c *fiber.Ctx) error {
- if websocket.IsWebSocketUpgrade(c) {
- return c.Next()
- }
- return fiber.ErrUpgradeRequired
-}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
index d2183411..3694c5a7 100644
--- a/plugins/kv/drivers/redis/plugin.go
+++ b/plugins/kv/drivers/redis/plugin.go
@@ -28,7 +28,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
// Serve is noop here
func (s *Plugin) Serve() chan error {
- return make(chan error, 1)
+ return make(chan error)
}
func (s *Plugin) Stop() error {
diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/memory/bst/bst.go
index 7d09a10f..3060ff11 100644
--- a/plugins/broadcast/memory/bst/bst.go
+++ b/plugins/memory/bst/bst.go
@@ -17,8 +17,8 @@ func NewBST() Storage {
}
// Insert uuid to the topic
-func (B *BST) Insert(uuid string, topic string) {
- curr := B
+func (b *BST) Insert(uuid string, topic string) {
+ curr := b
for {
if curr.topic == topic {
@@ -50,29 +50,31 @@ func (B *BST) Insert(uuid string, topic string) {
}
}
-func (B *BST) Get(topic string) map[string]struct{} {
- curr := B
+func (b *BST) Get(topic string) map[string]struct{} {
+ curr := b
for curr != nil {
if curr.topic == topic {
return curr.uuids
}
if curr.topic < topic {
curr = curr.left
+ continue
}
if curr.topic > topic {
curr = curr.right
+ continue
}
}
return nil
}
-func (B *BST) Remove(uuid string, topic string) {
- B.removeHelper(uuid, topic, nil)
+func (b *BST) Remove(uuid string, topic string) {
+ b.removeHelper(uuid, topic, nil)
}
-func (B *BST) removeHelper(uuid string, topic string, parent *BST) {
- curr := B
+func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
+ curr := b
for curr != nil {
if topic < curr.topic {
parent = curr
@@ -126,9 +128,9 @@ func (B *BST) removeHelper(uuid string, topic string, parent *BST) {
}
//go:inline
-func (B *BST) traverseForMinString() (string, map[string]struct{}) {
- if B.left == nil {
- return B.topic, B.uuids
+func (b *BST) traverseForMinString() (string, map[string]struct{}) {
+ if b.left == nil {
+ return b.topic, b.uuids
}
- return B.left.traverseForMinString()
+ return b.left.traverseForMinString()
}
diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/memory/bst/bst_test.go
index b5ad6c10..e8a13760 100644
--- a/plugins/broadcast/memory/bst/bst_test.go
+++ b/plugins/memory/bst/bst_test.go
@@ -8,6 +8,7 @@ import (
)
func TestNewBST(t *testing.T) {
+ // create a new bst
g := NewBST()
for i := 0; i < 100; i++ {
@@ -22,12 +23,15 @@ func TestNewBST(t *testing.T) {
g.Insert(uuid.NewString(), "comments3")
}
+ // should be 100
exist := g.Get("comments")
assert.Len(t, exist, 100)
+ // should be 100
exist2 := g.Get("comments2")
assert.Len(t, exist2, 100)
+ // should be 100
exist3 := g.Get("comments3")
assert.Len(t, exist3, 100)
}
diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/memory/bst/interface.go
index ecf40414..ecf40414 100644
--- a/plugins/broadcast/memory/bst/interface.go
+++ b/plugins/memory/bst/interface.go
diff --git a/plugins/broadcast/memory/config.go b/plugins/memory/config.go
index e80695bc..08dd9fc3 100644
--- a/plugins/broadcast/memory/config.go
+++ b/plugins/memory/config.go
@@ -1,6 +1,8 @@
package memory
// Config for the memory driver is empty, it's just a placeholder
-type Config struct{}
+type Config struct {
+ Path string `mapstructure:"path"`
+}
func (c *Config) InitDefaults() {}
diff --git a/plugins/memory/driver.go b/plugins/memory/driver.go
new file mode 100644
index 00000000..5a96e773
--- /dev/null
+++ b/plugins/memory/driver.go
@@ -0,0 +1,28 @@
+package memory
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/memory/bst"
+)
+
+type Driver struct {
+ tree bst.Storage
+}
+
+func NewInMemoryDriver() bst.Storage {
+ b := &Driver{
+ tree: bst.NewBST(),
+ }
+ return b
+}
+
+func (d *Driver) Insert(uuid string, topic string) {
+ d.tree.Insert(uuid, topic)
+}
+
+func (d *Driver) Remove(uuid, topic string) {
+ d.tree.Remove(uuid, topic)
+}
+
+func (d *Driver) Get(topic string) map[string]struct{} {
+ return d.tree.Get(topic)
+}
diff --git a/plugins/broadcast/memory/plugin.go b/plugins/memory/plugin.go
index 2bd894a0..5efd5522 100644
--- a/plugins/broadcast/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -1,17 +1,14 @@
package memory
import (
- "fmt"
-
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
const (
- PluginName string = "broadcast"
- SectionName string = "memory"
+ PluginName string = "memory"
)
type Plugin struct {
@@ -26,17 +23,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return errors.E(op, errors.Disabled)
}
- if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, errors.Disabled, err)
- }
-
- p.cfg.InitDefaults()
-
p.log = log
return nil
}
@@ -49,7 +35,6 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
-
return nil
}
@@ -58,10 +43,25 @@ func (p *Plugin) Available() {}
// Name is endure.Named interface implementation
func (p *Plugin) Name() string {
- // broadcast.memory
- return fmt.Sprintf("%s.%s", PluginName, SectionName)
+ return PluginName
}
-func (p *Plugin) Publish(msg []*broadcast.Message) error {
- return nil
+func (p *Plugin) Publish(messages []pubsub.Message) error {
+ panic("implement me")
+}
+
+func (p *Plugin) PublishAsync(messages []pubsub.Message) {
+ panic("implement me")
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ panic("implement me")
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ panic("implement me")
+}
+
+func (p *Plugin) Next() (pubsub.Message, error) {
+ panic("implement me")
}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
new file mode 100644
index 00000000..29016720
--- /dev/null
+++ b/plugins/redis/fanin.go
@@ -0,0 +1,100 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type FanIn struct {
+ sync.Mutex
+
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan pubsub.Message
+
+ exit chan struct{}
+}
+
+func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
+ out := make(chan pubsub.Message, 100)
+ fi := &FanIn{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (fi *FanIn) AddChannel(topics ...string) error {
+ const op = errors.Op("fanin_addchannel")
+ err := fi.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (fi *FanIn) read() {
+ for {
+ select {
+ //here we receive message from us (which we sent before in Publish)
+ //it should be compatible with the websockets.Msg interface
+ //payload should be in the redis.message.payload field
+
+ case msg, ok := <-fi.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+ m := &pubsub.Msg{}
+ err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("failed to unmarshal payload", "error", err.Error())
+ continue
+ }
+
+ fi.out <- m
+ case <-fi.exit:
+ return
+ }
+ }
+}
+
+func (fi *FanIn) RemoveChannel(topics ...string) error {
+ const op = errors.Op("fanin_remove")
+ err := fi.pubsub.Unsubscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (fi *FanIn) Stop() error {
+ fi.exit <- struct{}{}
+ close(fi.out)
+ close(fi.exit)
+ return nil
+}
+
+func (fi *FanIn) Consume() <-chan pubsub.Message {
+ return fi.out
+}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index f0011690..24ed1f92 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -1,8 +1,12 @@
package redis
import (
+ "context"
+ "sync"
+
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -10,72 +14,133 @@ import (
const PluginName = "redis"
type Plugin struct {
+ sync.Mutex
// config for RR integration
cfg *Config
// logger
log logger.Logger
// redis universal client
universalClient redis.UniversalClient
+
+ fanin *FanIn
}
-func (s *Plugin) GetClient() redis.UniversalClient {
- return s.universalClient
+func (p *Plugin) GetClient() redis.UniversalClient {
+ return p.universalClient
}
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("redis_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- s.cfg.InitDefaults()
- s.log = log
-
- s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: s.cfg.Addrs,
- DB: s.cfg.DB,
- Username: s.cfg.Username,
- Password: s.cfg.Password,
- SentinelPassword: s.cfg.SentinelPassword,
- MaxRetries: s.cfg.MaxRetries,
- MinRetryBackoff: s.cfg.MaxRetryBackoff,
- MaxRetryBackoff: s.cfg.MaxRetryBackoff,
- DialTimeout: s.cfg.DialTimeout,
- ReadTimeout: s.cfg.ReadTimeout,
- WriteTimeout: s.cfg.WriteTimeout,
- PoolSize: s.cfg.PoolSize,
- MinIdleConns: s.cfg.MinIdleConns,
- MaxConnAge: s.cfg.MaxConnAge,
- PoolTimeout: s.cfg.PoolTimeout,
- IdleTimeout: s.cfg.IdleTimeout,
- IdleCheckFrequency: s.cfg.IdleCheckFreq,
- ReadOnly: s.cfg.ReadOnly,
- RouteByLatency: s.cfg.RouteByLatency,
- RouteRandomly: s.cfg.RouteRandomly,
- MasterName: s.cfg.MasterName,
+ p.cfg.InitDefaults()
+ p.log = log
+
+ p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: p.cfg.Addrs,
+ DB: p.cfg.DB,
+ Username: p.cfg.Username,
+ Password: p.cfg.Password,
+ SentinelPassword: p.cfg.SentinelPassword,
+ MaxRetries: p.cfg.MaxRetries,
+ MinRetryBackoff: p.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: p.cfg.MaxRetryBackoff,
+ DialTimeout: p.cfg.DialTimeout,
+ ReadTimeout: p.cfg.ReadTimeout,
+ WriteTimeout: p.cfg.WriteTimeout,
+ PoolSize: p.cfg.PoolSize,
+ MinIdleConns: p.cfg.MinIdleConns,
+ MaxConnAge: p.cfg.MaxConnAge,
+ PoolTimeout: p.cfg.PoolTimeout,
+ IdleTimeout: p.cfg.IdleTimeout,
+ IdleCheckFrequency: p.cfg.IdleCheckFreq,
+ ReadOnly: p.cfg.ReadOnly,
+ RouteByLatency: p.cfg.RouteByLatency,
+ RouteRandomly: p.cfg.RouteRandomly,
+ MasterName: p.cfg.MasterName,
})
+ // init fanin
+ p.fanin = NewFanIn(p.universalClient, log)
+
return nil
}
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
return errCh
}
-func (s Plugin) Stop() error {
- return s.universalClient.Close()
+func (p *Plugin) Stop() error {
+ const op = errors.Op("redis_plugin_stop")
+ err := p.fanin.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = p.universalClient.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Available interface implementation
-func (s *Plugin) Available() {}
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ return f.Err()
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error())
+ continue
+ }
+ }
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ return p.fanin.AddChannel(topics...)
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ return p.fanin.RemoveChannel(topics...)
+}
+
+// Next return next message
+func (p *Plugin) Next() (pubsub.Message, error) {
+ return <-p.fanin.Consume(), nil
+}
diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go
new file mode 100644
index 00000000..18c63be3
--- /dev/null
+++ b/plugins/websockets/commands/enums.go
@@ -0,0 +1,9 @@
+package commands
+
+type Command string
+
+const (
+ Leave string = "leave"
+ Join string = "join"
+ Headers string = "headers"
+)
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
new file mode 100644
index 00000000..f3cb8e12
--- /dev/null
+++ b/plugins/websockets/config.go
@@ -0,0 +1,67 @@
+package websockets
+
+import "time"
+
+/*
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # also, they should implement RPC methods to publish data into them
+ # pubsubs might use general config section or its own
+
+ pubsubs:["redis", "amqp", "memory"]
+
+ # sample of the own config section for the redis pubsub driver
+ redis:
+ address:
+ - localhost:1111
+ .... the rest
+
+
+ # path used as websockets path
+ path: "/ws"
+*/
+
+// Config represents configuration for the ws plugin
+type Config struct {
+ // http path for the websocket
+ Path string `mapstructure:"path"`
+ // ["redis", "amqp", "memory"]
+ PubSubs []string `mapstructure:"pubsubs"`
+ Middleware []string `mapstructure:"middleware"`
+ Redis *RedisConfig `mapstructure:"redis"`
+}
+
+type RedisConfig struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefault initialize default values for the ws config
+func (c *Config) InitDefault() {
+ if c.Path == "" {
+ c.Path = "/ws"
+ }
+ if len(c.PubSubs) == 0 {
+ // memory used by default
+ c.PubSubs = append(c.PubSubs, "memory")
+ }
+}
diff --git a/plugins/broadcast/ws/connection/connection.go b/plugins/websockets/connection/connection.go
index cfb47e35..5eb30c61 100644
--- a/plugins/broadcast/ws/connection/connection.go
+++ b/plugins/websockets/connection/connection.go
@@ -3,7 +3,7 @@ package connection
import (
"sync"
- "github.com/gofiber/websocket/v2"
+ "github.com/fasthttp/websocket"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -43,8 +43,8 @@ func (c *Connection) Write(mt int, data []byte) error {
}
func (c *Connection) Read() (int, []byte, error) {
- c.RLock()
- defer c.RUnlock()
+ //c.RLock()
+ //defer c.RUnlock()
const op = errors.Op("websocket_read")
mt, data, err := c.conn.ReadMessage()
diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio
new file mode 100644
index 00000000..748fec45
--- /dev/null
+++ b/plugins/websockets/doc/broadcast.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-23T20:08:57.443Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="PhXRtBI6dFZzw_439Bi0" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">5VpZc+I4EP41VGUfoISNDx45ctUkm4NkZzIvU8KWjTbCYmWRwPz6lWzZ2JZhIByZrWWmiNU6aH19qLuthjmYLi4ZnE1uqY9IwwD+omEOG4bRtk1D/JGUZUrpdtyUEDLsq0Erwgj/RIoIFHWOfRSXBnJKCcezMtGjUYQ8XqJBxuh7eVhASflXZzBEGmHkQaJTv2KfTxS1a4BVxxXC4ST7aQOoninMRitCPIE+fS+QzPOGOWCU8vRpuhggItHLgEnnXazpzTljKOLbTFh2bm+eXvow8p5M8vrAMIh6za7ijS+zHSNfAKCalPEJDWkEyfmK2l9RbyidiWFtQfwbcb5U8oNzTgVpwqdE9aIF5t8Kzy/iGbQs1RpKdQFZY5k1Is6W34qNwizZXE1LWtk8HRcFVUznzEMbwMgUDLIQ8U3jlEQlVIVfULBfIjpFgiExgCECOX4r6xJUKhnm4/Kp9xQLng2gzMfOVEcZT6cDykuknKpZK9GLhwIbK1KiEDsoh2L4DZK52kL/8a43HPRGT4J89SygBmf3N8+X13/+oenR+wRzNJrBBPF34R3KGhFgQgaUUJaMNn2I3MAT9Jgz+ooKPbbnonEgZ9CIF+gg+WyS9xtiHC02Ckj1dswK0JmE3ws2nwljUjB3G6yXaUkau0JvaNB/He0JcWDJf7UQJx8d4vRzGIhzj64gNrpbQlzV+YNBbGoQP54Pr/dGOQgMr1aRfXtsWzUoH1KRqyh3nM9GuaOhfP1n8/b89u7xZW+kkb0Gaac7ThA9IdKW5X4y0q6GtNg6EUFRLFGej5szMg9xFGuwix3zMtKQ4DASz57ABwnw+hIXLKKinuqYYt9PYgGGYvwTjpOlJLIzeRYlO7P6DWso1xLBQJzGBe3jeGvT3lLJjQNAb3Z+DIMp/9JedBcE/ugPbl7+arYtDXvztIFVFj2lIZKzZWTVLkdWzhFDK7BtbLVlaKV0AbTaHaNbUodMGB8NvrIhNAhidJy4CuhHz71YCvSf9z1+UNu3kFPnFLu2Y8IjHz9O9xPjqMHwhUy/O/5iPut98f2bhyGPm/aJzDDPVVbpyUvJhOoNavcoeCsTrIVi2+zG2NICt7avvQRYd6qlqb6sBMivCRLfl3eS5cifKSM+Cxid5r0ejQIc6jmK3OMNHCNyvONPl9cmPdVMLa9sqB9pFGsHdSbYBC3bdsoeMRPpnulo0zRbZnnhtlle5CA+c5P2FvRAkyaMZ2kJKMAL5JdFWhOXVIWMp0ktKPGPytDbxoo+xNNQcE7wWHzDn3OG5CZDFCEGBfcXfVlsQqwVv4WHcaa2ZbSscqCje1PXycYU3alztEBHR/24YU1eI8q96Rb1orIPbv/CB3/co5rbxjQHrxftJURDDz/qxXpy17hR5/b3jaBlmFbZpDK/sqdrrESgRsvunsox6mWM++f+zfXoatdI8gAuywV2yygDbNl6XlzvtMyjOS397Ghr4PyP3Zilu7FNmrZ9amZUTA18xNZ6jMFlYYDyI2tN0bXKOYhdfS2y23jxkHJwzCxxk1iKhn11/wlGXU3qbGPLStchDLoWma6GzAi+oTzaf0fjmHqvUirZCYFppCH3Xyt7WZW3QfmbxqIc3JOWvTQ59B/vvpw/NswL+d8AzYZhw6lUyZAnIIDRfBx7DI8FvKfXZNsqpy/tGgRd54Qnk67IAjAioZqVsLH/mcu3xP2pcNFYKGJP9ILZQnynqKb0JpfHkuzrFPqkmjeVZss+pdylbh95lMHESpIx88hHjOAIrX5aPCkZpgyOM8KIs7nHRTqU9QgkxtXRgjar0iasSvnQpgkKeNrpys4qv090hr1kzjBZneEo3MTVMXgYZR6owgU4ywsTCtQcknu4JBT6+RQG3/VBJ99HwXgVX0Vfm9Vc8k2Bswlkvkd95NdssZ77ilOoeuBExXNHnfLbp2JUQJK7FbKgVpPDy/YFnGIiLf8KkTckVz24g+lWYgnLdFtZiaTgY8y609Ltyvr7kRy1nvHpAXDk9+SFGdGKaITW1TcbO93F+Mi9j48HsjX5+Ca3+8tAtiAyq0Zi1ofiWT13rLxJtIFbXiLdt3bLQ1/IrmifVVnocNdF6rVMT0V7tw87h6y/9Wte+5SveetRdtYGDXNS9a0EZ5TBBHmvibKGOBYBAJJnC5eHY1xwxKvx+gqPyMesUHj3mdB5tnGyIBZ52k0PKk7+EBG0Wa5NOHW3I5ya0sQhhHn5BoIF+xE9jB5vX69h8+rqu1sTQYuQGVwmx2mcRQ3Vg7MwZH00vT67UXj+VrmNXb234tZIBhwmxxTN1aXL1Pet7q6a5/8C</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/broadcast_arch.drawio b/plugins/websockets/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..54bba9c7
--- /dev/null
+++ b/plugins/websockets/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-24T11:49:56.412Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="onoEtIi9b01eD9L0oL7y" version="14.5.1" type="device"><diagram id="uVz6nu4iiJ3Jh2FwnUZb" name="Page-1">zZSxboMwEEC/hrES4JAma9I0WSpVYkCdKgdfsVXDIWMK6dfXNEcAoUjtUKUT9rvz2X4ceGybt3vDS/mEArQX+qL12IMXhsE6WrlHR05EAnZ/JplRgtgAYvUJBH2itRJQTRItoraqnMIUiwJSO2HcGGymaW+op7uWPIMZiFOu5zRRwsr+YqE/BA6gMtlvHfoUyXmfTaCSXGAzQmznsa1BtOdR3m5Bd/p6Med1j1eil5MZKOxPFpS71cthFbVJvE+Xr4k0+IZ3VOWD65pu/FwftaokmK64rjNV0PHtqZdisC4EdGV9j20aqSzEJU+7aOP6wDFpc+1mgRvSBmAstFdPHlx8uFYCzMGak0uhBeGCFFIXsTXNm9Er6TXL0dtgxDh1QXYpPXhyA1L1C23hTNvGIBcpr+y/sxUFt7bFZrYSOFaYvsPtbS2WU1uL6O9suenwuX/HRr9NtvsC</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/doc/ws.drawio b/plugins/websockets/doc/ws.drawio
index 739b797a..739b797a 100644
--- a/plugins/broadcast/doc/ws.drawio
+++ b/plugins/websockets/doc/ws.drawio
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
new file mode 100644
index 00000000..391c9a8c
--- /dev/null
+++ b/plugins/websockets/executor/executor.go
@@ -0,0 +1,146 @@
+package executor
+
+import (
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/commands"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type Response struct {
+ Topic string `json:"topic"`
+ Payload []string `json:"payload"`
+}
+
+type Executor struct {
+ conn *connection.Connection
+ storage *storage.Storage
+ log logger.Logger
+
+ // associated connection ID
+ connID string
+ pubsub pubsub.PubSub
+}
+
+// NewExecutor creates protected connection and starts command loop
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor {
+ return &Executor{
+ conn: conn,
+ connID: connID,
+ storage: bst,
+ log: log,
+ pubsub: pubsubs,
+ }
+}
+
+func (e *Executor) StartCommandLoop() error {
+ for {
+ mt, data, err := e.conn.Read()
+ if err != nil {
+ if mt == -1 {
+ return err
+ }
+
+ return err
+ }
+
+ msg := &pubsub.Msg{}
+
+ err = json.Unmarshal(data, msg)
+ if err != nil {
+ e.log.Error("error unmarshal message", "error", err)
+ continue
+ }
+
+ switch msg.Command() {
+ // handle leave
+ case commands.Join:
+ // TODO access validators model update
+ //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...)
+ //// validation error
+ //if err != nil {
+ // e.log.Error("validation error", "error", err)
+ //
+ // resp := &Response{
+ // Topic: "#join",
+ // Payload: msg.Topics(),
+ // }
+ //
+ // packet, err := json.Marshal(resp)
+ // if err != nil {
+ // e.log.Error("error marshal the body", "error", err)
+ // return err
+ // }
+ //
+ // err = e.conn.Write(websocket.BinaryMessage, packet)
+ // if err != nil {
+ // e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ // continue
+ // }
+ //
+ // continue
+ //}
+ // associate connection with topics
+ e.storage.Store(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@join",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ err = e.pubsub.Subscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ // handle leave
+ case commands.Leave:
+ // remove associated connections from the storage
+ e.storage.Remove(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@leave",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.pubsub.Unsubscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ case commands.Headers:
+
+ default:
+ e.log.Warn("unknown command", "command", msg.Command())
+ }
+ }
+}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
new file mode 100644
index 00000000..a247da69
--- /dev/null
+++ b/plugins/websockets/plugin.go
@@ -0,0 +1,206 @@
+package websockets
+
+import (
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/fasthttp/websocket"
+ "github.com/google/uuid"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/pool"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+const (
+ PluginName string = "websockets"
+)
+
+type Plugin struct {
+ sync.RWMutex
+ // Collection with all available pubsubs
+ pubsubs map[string]pubsub.PubSub
+
+ Config *Config
+ log logger.Logger
+
+ // global connections map
+ connections sync.Map
+ storage *storage.Storage
+
+ workersPool *pool.WorkersPool
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("websockets_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.Config)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.pubsubs = make(map[string]pubsub.PubSub)
+ p.log = log
+ p.storage = storage.NewStorage()
+ p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
+ go func() {
+ ps := p.pubsubs["redis"]
+
+ for {
+ // get message
+ // get topic
+ // get connection uuid from the storage by the topic
+ // write payload into the connection
+ // do that in the workers pool
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
+
+ if data == nil {
+ continue
+ }
+
+ p.workersPool.Queue(data)
+ }
+ }()
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ p.workersPool.Stop()
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.GetPublishers,
+ }
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// GetPublishers collects all pubsubs
+func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) {
+ p.pubsubs[name.Name()] = pub
+}
+
+func (p *Plugin) Middleware(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != p.Config.Path {
+ next.ServeHTTP(w, r)
+ return
+ }
+
+ // connection upgrader
+ upgraded := websocket.Upgrader{
+ HandshakeTimeout: time.Second * 60,
+ ReadBufferSize: 0,
+ WriteBufferSize: 0,
+ WriteBufferPool: nil,
+ Subprotocols: nil,
+ Error: nil,
+ CheckOrigin: nil,
+ EnableCompression: false,
+ }
+
+ // upgrade connection to websocket connection
+ _conn, err := upgraded.Upgrade(w, r, nil)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ safeConn := connection.NewConnection(_conn, p.log)
+ defer func() {
+ err = safeConn.Close()
+ if err != nil {
+ p.log.Error("connection close error", "error", err)
+ }
+ }()
+
+ // generate UUID from the connection
+ connectionID := uuid.NewString()
+ // store connection
+ p.connections.Store(connectionID, safeConn)
+ // when exiting - delete the connection
+ defer func() {
+ p.connections.Delete(connectionID)
+ }()
+
+ // Executor wraps a connection to have a safe abstraction
+ p.Lock()
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"])
+ p.Unlock()
+
+ p.log.Info("websocket client connected", "uuid", connectionID)
+
+ err = e.StartCommandLoop()
+ if err != nil {
+ p.log.Error("command loop error", "error", err.Error())
+ return
+ }
+ })
+}
+
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ if br, ok := p.pubsubs[msg[i].Broker()]; ok {
+ err := br.Publish(msg)
+ if err != nil {
+ return errors.E(err)
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker())
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ err := p.pubsubs[msg[i].Broker()].Publish(msg)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
new file mode 100644
index 00000000..ee31d62f
--- /dev/null
+++ b/plugins/websockets/pool/workers_pool.go
@@ -0,0 +1,104 @@
+package pool
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type WorkersPool struct {
+ storage *storage.Storage
+ connections *sync.Map
+ resPool sync.Pool
+ log logger.Logger
+
+ queue chan pubsub.Message
+ exit chan struct{}
+}
+
+func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
+ wp := &WorkersPool{
+ connections: connections,
+ queue: make(chan pubsub.Message, 100),
+ storage: storage,
+ log: log,
+ exit: make(chan struct{}),
+ }
+
+ wp.resPool.New = func() interface{} {
+ return make(map[string]struct{}, 10)
+ }
+
+ for i := 0; i < 10; i++ {
+ wp.do()
+ }
+
+ return wp
+}
+
+func (wp *WorkersPool) Queue(msg pubsub.Message) {
+ wp.queue <- msg
+}
+
+func (wp *WorkersPool) Stop() {
+ for i := 0; i < 10; i++ {
+ wp.exit <- struct{}{}
+ }
+
+ close(wp.exit)
+}
+
+func (wp *WorkersPool) put(res map[string]struct{}) {
+ // optimized
+ // https://go-review.googlesource.com/c/go/+/110055/
+ // not O(n), but O(1)
+ for k := range res {
+ delete(res, k)
+ }
+}
+
+func (wp *WorkersPool) get() map[string]struct{} {
+ return wp.resPool.Get().(map[string]struct{})
+}
+
+func (wp *WorkersPool) do() {
+ go func() {
+ for {
+ select {
+ case msg := <-wp.queue:
+ res := wp.get()
+ // get connections for the particular topic
+ wp.storage.Get(msg.Topics(), res)
+ if len(res) == 0 {
+ wp.log.Info("no such topic", "topic", msg.Topics())
+ wp.put(res)
+ continue
+ }
+
+ for i := range res {
+ c, ok := wp.connections.Load(i)
+ if !ok {
+ panic("not ok here (((")
+ }
+
+ conn := c.(*connection.Connection)
+ err := conn.Write(websocket.BinaryMessage, msg.Payload())
+ if err != nil {
+ // TODO handle error
+ wp.put(res)
+ continue
+ }
+ }
+
+ wp.put(res)
+ case <-wp.exit:
+ wp.log.Info("get exit signal, exiting from the workers pool")
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
new file mode 100644
index 00000000..0f0303b7
--- /dev/null
+++ b/plugins/websockets/rpc.go
@@ -0,0 +1,46 @@
+package websockets
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error {
+ const op = errors.Op("broadcast_publish")
+
+ // publish to the registered broker
+ mi := make([]pubsub.Message, 0, len(msg))
+ // golang can't convert slice in-place
+ // so, we need to convert it manually
+ for i := 0; i < len(msg); i++ {
+ mi = append(mi, msg[i])
+ }
+ err := r.plugin.Publish(mi)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+func (r *rpc) PublishAsync(msg []*pubsub.Msg, ok *bool) error {
+ // publish to the registered broker
+ mi := make([]pubsub.Message, 0, len(msg))
+ // golang can't convert slice in-place
+ // so, we need to convert it manually
+ for i := 0; i < len(msg); i++ {
+ mi = append(mi, msg[i])
+ }
+
+ r.plugin.PublishAsync(mi)
+
+ *ok = true
+ return nil
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
new file mode 100644
index 00000000..a7e49207
--- /dev/null
+++ b/plugins/websockets/storage/storage.go
@@ -0,0 +1,50 @@
+package storage
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/plugins/memory/bst"
+)
+
+type Storage struct {
+ sync.RWMutex
+ BST bst.Storage
+}
+
+func NewStorage() *Storage {
+ return &Storage{
+ BST: bst.NewBST(),
+ }
+}
+
+func (s *Storage) Store(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Insert(connID, topics[i])
+ }
+}
+
+func (s *Storage) Remove(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Remove(connID, topics[i])
+ }
+}
+
+func (s *Storage) Get(topics []string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ for i := 0; i < len(topics); i++ {
+ d := s.BST.Get(topics[i])
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+ }
+}
diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go
new file mode 100644
index 00000000..9d9522d4
--- /dev/null
+++ b/plugins/websockets/validator/access_validator.go
@@ -0,0 +1,102 @@
+package validator
+
+import (
+ "bytes"
+ "io"
+ "net/http"
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
+)
+
+type AccessValidator struct {
+ buffer *bytes.Buffer
+ header http.Header
+ status int
+}
+
+func NewValidator() *AccessValidator {
+ return &AccessValidator{
+ buffer: bytes.NewBuffer(nil),
+ header: make(http.Header),
+ }
+}
+
+// Copy all content to parent response writer.
+func (w *AccessValidator) Copy(rw http.ResponseWriter) {
+ rw.WriteHeader(w.status)
+
+ for k, v := range w.header {
+ for _, vv := range v {
+ rw.Header().Add(k, vv)
+ }
+ }
+
+ _, _ = io.Copy(rw, w.buffer)
+}
+
+// Header returns the header map that will be sent by WriteHeader.
+func (w *AccessValidator) Header() http.Header {
+ return w.header
+}
+
+// Write writes the data to the connection as part of an HTTP reply.
+func (w *AccessValidator) Write(p []byte) (int, error) {
+ return w.buffer.Write(p)
+}
+
+// WriteHeader sends an HTTP response header with the provided status code.
+func (w *AccessValidator) WriteHeader(statusCode int) {
+ w.status = statusCode
+}
+
+// IsOK returns true if response contained 200 status code.
+func (w *AccessValidator) IsOK() bool {
+ return w.status == 200
+}
+
+// Body returns response body to rely to user.
+func (w *AccessValidator) Body() []byte {
+ return w.buffer.Bytes()
+}
+
+// Error contains server response.
+func (w *AccessValidator) Error() string {
+ return w.buffer.String()
+}
+
+// AssertServerAccess checks if user can join server and returns error and body if user can not. Must return nil in
+// case of error
+func (w *AccessValidator) AssertServerAccess(f http.HandlerFunc, r *http.Request) error {
+ if err := attributes.Set(r, "ws:joinServer", true); err != nil {
+ return err
+ }
+
+ defer delete(attributes.All(r), "ws:joinServer")
+
+ f(w, r)
+
+ if !w.IsOK() {
+ return w
+ }
+
+ return nil
+}
+
+// AssertTopicsAccess checks if user can access given upstream, the application will receive all user headers and cookies.
+// the decision to authorize user will be based on response code (200).
+func (w *AccessValidator) AssertTopicsAccess(f http.HandlerFunc, r *http.Request, channels ...string) error {
+ if err := attributes.Set(r, "ws:joinTopics", strings.Join(channels, ",")); err != nil {
+ return err
+ }
+
+ defer delete(attributes.All(r), "ws:joinTopics")
+
+ f(w, r)
+
+ if !w.IsOK() {
+ return w
+ }
+
+ return nil
+}
diff --git a/plugins/websockets/validator/access_validator_test.go b/plugins/websockets/validator/access_validator_test.go
new file mode 100644
index 00000000..4a07b00f
--- /dev/null
+++ b/plugins/websockets/validator/access_validator_test.go
@@ -0,0 +1,35 @@
+package validator
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestResponseWrapper_Body(t *testing.T) {
+ w := NewValidator()
+ _, _ = w.Write([]byte("hello"))
+
+ assert.Equal(t, []byte("hello"), w.Body())
+}
+
+func TestResponseWrapper_Header(t *testing.T) {
+ w := NewValidator()
+ w.Header().Set("k", "value")
+
+ assert.Equal(t, "value", w.Header().Get("k"))
+}
+
+func TestResponseWrapper_StatusCode(t *testing.T) {
+ w := NewValidator()
+ w.WriteHeader(200)
+
+ assert.True(t, w.IsOK())
+}
+
+func TestResponseWrapper_StatusCodeBad(t *testing.T) {
+ w := NewValidator()
+ w.WriteHeader(400)
+
+ assert.False(t, w.IsOK())
+}
diff --git a/tests/Dockerfile b/tests/Dockerfile
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/Dockerfile
diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml
index 67d5476b..a8e8a7c5 100644
--- a/tests/docker-compose.yaml
+++ b/tests/docker-compose.yaml
@@ -7,5 +7,8 @@ services:
- "0.0.0.0:11211:11211"
redis:
image: redis:6
+ mem_limit: 16384m
+ mem_reservation: 2048M
+ cpus: 8
ports:
- "6379:6379"
diff --git a/tests/plugins/informer/.rr-informer.yaml b/tests/plugins/informer/.rr-informer.yaml
index e1edbb44..94c9a856 100644
--- a/tests/plugins/informer/.rr-informer.yaml
+++ b/tests/plugins/informer/.rr-informer.yaml
@@ -3,8 +3,8 @@ server:
user: ""
group: ""
env:
- "RR_CONFIG": "/some/place/on/the/C134"
- "RR_CONFIG2": "C138"
+ - RR_CONFIG: "/some/place/on/the/C134"
+ - RR_CONFIG: "C138"
relay: "pipes"
relay_timeout: "20s"
@@ -12,4 +12,4 @@ rpc:
listen: tcp://127.0.0.1:6001
logs:
mode: development
- level: error \ No newline at end of file
+ level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
new file mode 100644
index 00000000..9973b2dc
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
@@ -0,0 +1,50 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:15395
+ max_request_size: 1024
+ middleware: ["websockets"]
+ uploads:
+ forbid: [ ".php", ".exe", ".bat" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # also, they should implement RPC methods to publish data into them
+ # pubsubs might use general config section or its own
+
+ pubsubs: [ "redis" ]
+
+ # sample of the own config section for the redis pubsub driver
+
+ redis:
+ addrs:
+ - localhost:1111
+ # path used as websockets path
+ path: "/ws"
+logs:
+ mode: development
+ level: debug
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
new file mode 100644
index 00000000..a9b90fd0
--- /dev/null
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -0,0 +1,102 @@
+package websockets
+
+import (
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/websockets"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBroadcastInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.InfoLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1000)
+ t.Run("test1", test1)
+ t.Run("test2", test2)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func test1(t *testing.T) {
+
+}
+
+func test2(t *testing.T) {
+
+}