summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/config.go29
-rw-r--r--plugins/broadcast/doc/.rr-broadcast.yaml29
-rw-r--r--plugins/broadcast/doc/broadcast.drawio151
-rw-r--r--plugins/broadcast/doc/ws.drawio1
-rw-r--r--plugins/broadcast/interface.go41
-rw-r--r--plugins/broadcast/memory/bst/bst.go134
-rw-r--r--plugins/broadcast/memory/bst/bst_test.go33
-rw-r--r--plugins/broadcast/memory/bst/interface.go11
-rw-r--r--plugins/broadcast/memory/config.go6
-rw-r--r--plugins/broadcast/memory/driver.go29
-rw-r--r--plugins/broadcast/memory/plugin.go67
-rw-r--r--plugins/broadcast/plugin.go105
-rw-r--r--plugins/broadcast/redis/driver.go29
-rw-r--r--plugins/broadcast/redis/plugin.go1
-rw-r--r--plugins/broadcast/redis/storage.go1
-rw-r--r--plugins/broadcast/rpc.go26
-rw-r--r--plugins/broadcast/ws/commands/join.go10
-rw-r--r--plugins/broadcast/ws/commands/leave.go1
-rw-r--r--plugins/broadcast/ws/commands/subscribe.go1
-rw-r--r--plugins/broadcast/ws/config.go26
-rw-r--r--plugins/broadcast/ws/connection/connection.go69
-rw-r--r--plugins/broadcast/ws/plugin.go59
-rw-r--r--plugins/broadcast/ws/subscriber.go50
-rw-r--r--plugins/broadcast/ws/ws_middleware.go13
24 files changed, 0 insertions, 922 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
deleted file mode 100644
index 5e7b7f20..00000000
--- a/plugins/broadcast/config.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package broadcast
-
-/*
-broadcast:
- ws-us-region-1:
- subscriber: websockets
- middleware: ["headers", "gzip"] # ????
- address: "localhost:53223"
- path: "/ws"
-
- storage: redis
- address:
- - 6379
- db: 0
-*/
-
-// Config represents configuration for the ws plugin
-type Config struct {
- // Sections represent particular broadcast plugin section
- Sections map[string]interface{} `mapstructure:"sections"`
-}
-
-func (c *Config) InitDefaults() {
-
-}
-
-func (c *Config) Valid() error {
- return nil
-}
diff --git a/plugins/broadcast/doc/.rr-broadcast.yaml b/plugins/broadcast/doc/.rr-broadcast.yaml
deleted file mode 100644
index 8b0eef20..00000000
--- a/plugins/broadcast/doc/.rr-broadcast.yaml
+++ /dev/null
@@ -1,29 +0,0 @@
-broadcast:
- # path to enable web-socket handler middleware
- path: /ws
-
- # optional, redis broker configuration
- redis:
- addrs:
- - "localhost:6379"
- # if a MasterName is passed a sentinel-backed FailoverClient will be returned
- master_name: ""
- username: ""
- password: ""
- db: 0
- sentinel_password: ""
- route_by_latency: false
- route_randomly: false
- dial_timeout: 0 # accepted values [1s, 5m, 3h]
- max_retries: 1
- min_retry_backoff: 0 # accepted values [1s, 5m, 3h]
- max_retry_backoff: 0 # accepted values [1s, 5m, 3h]
- pool_size: 0
- min_idle_conns: 0
- max_conn_age: 0 # accepted values [1s, 5m, 3h]
- read_timeout: 0 # accepted values [1s, 5m, 3h]
- write_timeout: 0 # accepted values [1s, 5m, 3h]
- pool_timeout: 0 # accepted values [1s, 5m, 3h]
- idle_timeout: 0 # accepted values [1s, 5m, 3h]
- idle_check_freq: 0 # accepted values [1s, 5m, 3h]
- read_only: false
diff --git a/plugins/broadcast/doc/broadcast.drawio b/plugins/broadcast/doc/broadcast.drawio
deleted file mode 100644
index 2339f5b1..00000000
--- a/plugins/broadcast/doc/broadcast.drawio
+++ /dev/null
@@ -1,151 +0,0 @@
-<mxfile>
- <diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">
- <mxGraphModel dx="1582" dy="1094" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="1920" pageHeight="1200" math="0" shadow="0">
- <root>
- <mxCell id="0"/>
- <mxCell id="1" parent="0"/>
- <mxCell id="y4MLTYBancT3lkQri0nA-9" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry">
- <mxPoint x="620" y="440" as="targetPoint"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-15" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="y4MLTYBancT3lkQri0nA-2" edge="1">
- <mxGeometry relative="1" as="geometry">
- <Array as="points">
- <mxPoint x="460" y="240"/>
- <mxPoint x="270" y="240"/>
- </Array>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-16" value="4" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="34_DfmtK1x9xla_BCLYV-17" edge="1">
- <mxGeometry x="0.0526" y="10" relative="1" as="geometry">
- <mxPoint x="459.72413793103465" y="510" as="targetPoint"/>
- <mxPoint x="-10" y="10" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-1" value="BROADCAST HUB (PLUGIN)" style="whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="430" y="410" width="120" height="60" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-5" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-19" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="34_DfmtK1x9xla_BCLYV-1" edge="1">
- <mxGeometry relative="1" as="geometry">
- <Array as="points">
- <mxPoint x="240" y="160"/>
- <mxPoint x="696" y="160"/>
- </Array>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-22" value="6" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-19" vertex="1" connectable="0">
- <mxGeometry x="-0.2172" relative="1" as="geometry">
- <mxPoint x="108.62" y="-10" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-2" value="WS" style="whiteSpace=wrap;html=1;fillColor=#f5f5f5;strokeColor=#666666;fontColor=#333333;" parent="1" vertex="1">
- <mxGeometry x="210" y="290" width="120" height="40" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-23" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" edge="1">
- <mxGeometry relative="1" as="geometry">
- <mxPoint x="50" y="440.2068965517242" as="targetPoint"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-3" value="REDIS" style="whiteSpace=wrap;html=1;fillColor=#fff2cc;strokeColor=#d6b656;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="210" y="420" width="120" height="40" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-7" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-4" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-4" value="IN-MEMORY" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="210" y="560" width="120" height="40" as="geometry"/>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-8" value="Collects sub-plugins" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1">
- <mxGeometry x="310" y="370" width="120" height="20" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-15" value="3" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.75;exitDx=0;exitDy=0;entryX=1;entryY=0.75;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-10" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry x="0.1429" y="15" relative="1" as="geometry">
- <mxPoint as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="y4MLTYBancT3lkQri0nA-10" value="RPC BUS" style="whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=#9673a6;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="620" y="410" width="120" height="60" as="geometry"/>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;entryX=0.75;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="CDYlmZ7dxupAKddLQDts-1" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-8" value="Connect to the GO endpoint (from the config)" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="CDYlmZ7dxupAKddLQDts-6" vertex="1" connectable="0">
- <mxGeometry x="-0.6679" y="-2" relative="1" as="geometry">
- <mxPoint x="-33.3" y="-13" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-1" value="" style="aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/azure2/general/Browser.svg;" parent="1" vertex="1">
- <mxGeometry x="652.5" y="30" width="87.5" height="70" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-12" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="y4MLTYBancT3lkQri0nA-10" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-20" value="2" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-12" vertex="1" connectable="0">
- <mxGeometry x="0.2355" y="-1" relative="1" as="geometry">
- <mxPoint x="9" y="12.69" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-3" value="PUBLISH" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
- <mxGeometry x="636.25" y="568" width="87.5" height="30" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-11" value="1" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-5" target="CDYlmZ7dxupAKddLQDts-3" edge="1">
- <mxGeometry x="0.2" y="-10" relative="1" as="geometry">
- <Array as="points">
- <mxPoint x="680" y="620"/>
- <mxPoint x="680" y="620"/>
- </Array>
- <mxPoint as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-5" value="PHP" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
- <mxGeometry x="620" y="628" width="120" height="30" as="geometry"/>
- </mxCell>
- <mxCell id="CDYlmZ7dxupAKddLQDts-9" value="Save the websocket connection" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1">
- <mxGeometry x="280" y="220" width="180" height="20" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-1" value="BROKER?? -&amp;gt; Subscriber" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
- <mxGeometry x="653" y="100" width="87" height="30" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-9" value="&lt;p style=&quot;margin: 0px ; margin-top: 4px ; text-align: center ; text-decoration: underline&quot;&gt;&lt;b&gt;Structure&lt;/b&gt;&lt;/p&gt;&lt;hr&gt;&lt;p style=&quot;margin: 0px ; margin-left: 8px&quot;&gt;Topic = string&lt;br&gt;Payload = raw&lt;br&gt;&lt;/p&gt;&lt;p style=&quot;margin: 0px ; margin-left: 8px&quot;&gt;Broker = ws (from config) (hardcoded)&lt;br&gt;&lt;/p&gt;" style="verticalAlign=top;align=left;overflow=fill;fontSize=12;fontFamily=Helvetica;html=1;" parent="1" vertex="1">
- <mxGeometry x="780" y="543" width="220" height="80" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-10" value="" style="endArrow=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="34_DfmtK1x9xla_BCLYV-9" edge="1">
- <mxGeometry width="50" height="50" relative="1" as="geometry">
- <mxPoint x="740" y="608" as="sourcePoint"/>
- <mxPoint x="790" y="558" as="targetPoint"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-14" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-13" target="y4MLTYBancT3lkQri0nA-1" edge="1">
- <mxGeometry relative="1" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-13" value="AMQP" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1">
- <mxGeometry x="210" y="658" width="120" height="40" as="geometry"/>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-18" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-17" target="y4MLTYBancT3lkQri0nA-2" edge="1">
- <mxGeometry relative="1" as="geometry">
- <Array as="points">
- <mxPoint x="160" y="520"/>
- <mxPoint x="160" y="380"/>
- <mxPoint x="270" y="380"/>
- </Array>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-21" value="5" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-18" vertex="1" connectable="0">
- <mxGeometry x="-0.317" y="3" relative="1" as="geometry">
- <mxPoint x="-7" y="-10.34" as="offset"/>
- </mxGeometry>
- </mxCell>
- <mxCell id="34_DfmtK1x9xla_BCLYV-17" value="&lt;ul&gt;&lt;li&gt;Check registered topics&lt;/li&gt;&lt;li&gt;Redirect to driver&lt;/li&gt;&lt;/ul&gt;" style="whiteSpace=wrap;html=1;align=left;" parent="1" vertex="1">
- <mxGeometry x="240" y="500" width="175" height="40" as="geometry"/>
- </mxCell>
- </root>
- </mxGraphModel>
- </diagram>
-</mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/doc/ws.drawio b/plugins/broadcast/doc/ws.drawio
deleted file mode 100644
index 739b797a..00000000
--- a/plugins/broadcast/doc/ws.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
deleted file mode 100644
index 47c779b5..00000000
--- a/plugins/broadcast/interface.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package broadcast
-
-import (
- "encoding/json"
-)
-
-// Subscriber defines the ability to operate as message passing broker.
-type Subscriber interface {
- // Subscribe broker to one or multiple topics.
- Subscribe(topics ...string) error
- // UnsubscribePattern broker from pattern.
- UnsubscribePattern(pattern string) error
-}
-
-// Storage used to store patterns and topics
-type Storage interface {
- // Store connection uuid associated with the provided topics
- Store(uuid string, topics ...string)
- // StorePattern stores pattern associated with the particular connection
- StorePattern(uuid string, pattern string)
-
- // GetConnection returns connections for the particular pattern
- GetConnection(pattern string) []string
-
- // Construct is a constructor for the storage according to the provided configuration key (broadcast.websocket for example)
- Construct(key string) (Storage, error)
-}
-
-type Publisher interface {
- // Publish one or multiple Channel.
- Publish(messages ...*Message) error
-}
-
-// Message represent single message.
-type Message struct {
- // Topic message been pushed into.
- Topic string `json:"topic"`
-
- // Payload to be broadcasted. Must be valid json when transferred over RPC.
- Payload json.RawMessage `json:"payload"`
-}
diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/broadcast/memory/bst/bst.go
deleted file mode 100644
index 7d09a10f..00000000
--- a/plugins/broadcast/memory/bst/bst.go
+++ /dev/null
@@ -1,134 +0,0 @@
-package bst
-
-// BST ...
-type BST struct {
- // registered topic, not unique
- topic string
- // associated connections with the topic
- uuids map[string]struct{}
-
- // left and right subtrees
- left *BST
- right *BST
-}
-
-func NewBST() Storage {
- return &BST{}
-}
-
-// Insert uuid to the topic
-func (B *BST) Insert(uuid string, topic string) {
- curr := B
-
- for {
- if curr.topic == topic {
- curr.uuids[uuid] = struct{}{}
- return
- }
- // if topic less than curr topic
- if curr.topic < topic {
- if curr.left == nil {
- curr.left = &BST{
- topic: topic,
- uuids: map[string]struct{}{uuid: {}},
- }
- return
- }
- // move forward
- curr = curr.left
- } else {
- if curr.right == nil {
- curr.right = &BST{
- topic: topic,
- uuids: map[string]struct{}{uuid: {}},
- }
- return
- }
-
- curr = curr.right
- }
- }
-}
-
-func (B *BST) Get(topic string) map[string]struct{} {
- curr := B
- for curr != nil {
- if curr.topic == topic {
- return curr.uuids
- }
- if curr.topic < topic {
- curr = curr.left
- }
- if curr.topic > topic {
- curr = curr.right
- }
- }
-
- return nil
-}
-
-func (B *BST) Remove(uuid string, topic string) {
- B.removeHelper(uuid, topic, nil)
-}
-
-func (B *BST) removeHelper(uuid string, topic string, parent *BST) {
- curr := B
- for curr != nil {
- if topic < curr.topic {
- parent = curr
- curr = curr.left
- } else if topic > curr.topic {
- parent = curr
- curr = curr.right
- } else {
- if len(curr.uuids) > 1 {
- if _, ok := curr.uuids[uuid]; ok {
- delete(curr.uuids, uuid)
- return
- }
- }
-
- if curr.left != nil && curr.right != nil {
- curr.topic, curr.uuids = curr.right.traverseForMinString()
- curr.right.removeHelper(curr.topic, uuid, curr)
- } else if parent == nil {
- if curr.left != nil {
- curr.topic = curr.left.topic
- curr.uuids = curr.left.uuids
-
- curr.right = curr.left.right
- curr.left = curr.left.left
- } else if curr.right != nil {
- curr.topic = curr.right.topic
- curr.uuids = curr.right.uuids
-
- curr.left = curr.right.left
- curr.right = curr.right.right
- } else {
- // single node tree
- }
- } else if parent.left == curr {
- if curr.left != nil {
- parent.left = curr.left
- } else {
- parent.left = curr.right
- }
- } else if parent.right == curr {
- if curr.left != nil {
- parent.right = curr.left
- } else {
- parent.right = curr.right
- }
- }
- break
- }
- }
-}
-
-//go:inline
-func (B *BST) traverseForMinString() (string, map[string]struct{}) {
- if B.left == nil {
- return B.topic, B.uuids
- }
- return B.left.traverseForMinString()
-}
diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/broadcast/memory/bst/bst_test.go
deleted file mode 100644
index b5ad6c10..00000000
--- a/plugins/broadcast/memory/bst/bst_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package bst
-
-import (
- "testing"
-
- "github.com/google/uuid"
- "github.com/stretchr/testify/assert"
-)
-
-func TestNewBST(t *testing.T) {
- g := NewBST()
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments")
- }
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments2")
- }
-
- for i := 0; i < 100; i++ {
- g.Insert(uuid.NewString(), "comments3")
- }
-
- exist := g.Get("comments")
- assert.Len(t, exist, 100)
-
- exist2 := g.Get("comments2")
- assert.Len(t, exist2, 100)
-
- exist3 := g.Get("comments3")
- assert.Len(t, exist3, 100)
-}
diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/broadcast/memory/bst/interface.go
deleted file mode 100644
index ecf40414..00000000
--- a/plugins/broadcast/memory/bst/interface.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package bst
-
-// Storage is general in-memory BST storage implementation
-type Storage interface {
- // Insert inserts to a vertex with topic ident connection uuid
- Insert(uuid string, topic string)
- // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
- Remove(uuid, topic string)
- // Get will return all connections associated with the topic
- Get(topic string) map[string]struct{}
-}
diff --git a/plugins/broadcast/memory/config.go b/plugins/broadcast/memory/config.go
deleted file mode 100644
index e80695bc..00000000
--- a/plugins/broadcast/memory/config.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package memory
-
-// Config for the memory driver is empty, it's just a placeholder
-type Config struct{}
-
-func (c *Config) InitDefaults() {}
diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go
deleted file mode 100644
index 80527e4b..00000000
--- a/plugins/broadcast/memory/driver.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package memory
-
-import (
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
-)
-
-type Driver struct {
-}
-
-func NewInMemoryDriver() broadcast.Storage {
- b := &Driver{}
- return b
-}
-
-func (d *Driver) Store(uuid string, topics ...string) {
- panic("implement me")
-}
-
-func (d *Driver) StorePattern(uuid string, pattern string) {
- panic("implement me")
-}
-
-func (d *Driver) GetConnection(pattern string) []string {
- panic("implement me")
-}
-
-func (d *Driver) Construct(key string) (broadcast.Storage, error) {
- return nil, nil
-}
diff --git a/plugins/broadcast/memory/plugin.go b/plugins/broadcast/memory/plugin.go
deleted file mode 100644
index 2bd894a0..00000000
--- a/plugins/broadcast/memory/plugin.go
+++ /dev/null
@@ -1,67 +0,0 @@
-package memory
-
-import (
- "fmt"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "broadcast"
- SectionName string = "memory"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg *Config
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("memory_plugin_init")
-
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, errors.Disabled, err)
- }
-
- p.cfg.InitDefaults()
-
- p.log = log
- return nil
-}
-
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("memory_plugin_serve")
- errCh := make(chan error)
-
- return errCh
-}
-
-func (p *Plugin) Stop() error {
-
- return nil
-}
-
-// Available interface implementation for the plugin
-func (p *Plugin) Available() {}
-
-// Name is endure.Named interface implementation
-func (p *Plugin) Name() string {
- // broadcast.memory
- return fmt.Sprintf("%s.%s", PluginName, SectionName)
-}
-
-func (p *Plugin) Publish(msg []*broadcast.Message) error {
- return nil
-}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
deleted file mode 100644
index 156bea80..00000000
--- a/plugins/broadcast/plugin.go
+++ /dev/null
@@ -1,105 +0,0 @@
-package broadcast
-
-import (
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "broadcast"
-)
-
-type Plugin struct {
- broker Subscriber
- driver Storage
-
- log logger.Logger
- cfg *Config
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("broadcast_plugin_init")
-
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, errors.Disabled, err)
- }
-
- p.cfg.InitDefaults()
-
- p.log = log
- return nil
-}
-
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("broadcast_plugin_serve")
- errCh := make(chan error)
-
- // if there are no brokers, return nil
- if p.broker == nil {
- errCh <- errors.E(op, errors.Str("no broker detected"))
- return errCh
- }
-
- if p.driver == nil {
- // Or if no storage detected, use in-memory storage
- errCh <- errors.E(op, errors.Str("no storage detected"))
- return errCh
- }
-
- // start the underlying broker
- go func() {
- // err := p.broker.Serve()
- // if err != nil {
- // errCh <- errors.E(op, err)
- // }
- }()
-
- return errCh
-}
-
-func (p *Plugin) Stop() error {
- return nil
-}
-
-// Available interface implementation for the plugin
-func (p *Plugin) Available() {}
-
-// Name is endure.Named interface implementation
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.CollectSubscriber,
- }
-}
-
-func (p *Plugin) CollectSubscriber(name endure.Named, subscriber Subscriber) {
- p.broker = subscriber
-}
-
-func (p *Plugin) CollectStorage(name endure.Named, storage Storage) {
- p.driver = storage
-}
-
-func (p *Plugin) RPC() interface{} {
- // create an RPC service for the collects
- r := &rpc{
- log: p.log,
- svc: p,
- }
- return r
-}
-
-func (p *Plugin) Publish(msg []*Message) error {
- const op = errors.Op("broadcast_plugin_publish")
- return nil
-}
diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go
deleted file mode 100644
index 556d5f03..00000000
--- a/plugins/broadcast/redis/driver.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package redis
-
-import (
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
-)
-
-type Driver struct {
-}
-
-func NewInMemoryDriver() broadcast.Storage {
- b := &Driver{}
- return b
-}
-
-func (d *Driver) Store(uuid string, topics ...string) {
- panic("implement me")
-}
-
-func (d *Driver) StorePattern(uuid string, pattern string) {
- panic("implement me")
-}
-
-func (d *Driver) GetConnection(pattern string) []string {
- panic("implement me")
-}
-
-func (d *Driver) Construct(key string) (broadcast.Storage, error) {
- panic("implement me")
-}
diff --git a/plugins/broadcast/redis/plugin.go b/plugins/broadcast/redis/plugin.go
deleted file mode 100644
index 65a229e1..00000000
--- a/plugins/broadcast/redis/plugin.go
+++ /dev/null
@@ -1 +0,0 @@
-package redis
diff --git a/plugins/broadcast/redis/storage.go b/plugins/broadcast/redis/storage.go
deleted file mode 100644
index 65a229e1..00000000
--- a/plugins/broadcast/redis/storage.go
+++ /dev/null
@@ -1 +0,0 @@
-package redis
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
deleted file mode 100644
index 948fd7ae..00000000
--- a/plugins/broadcast/rpc.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package broadcast
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type rpc struct {
- log logger.Logger
- svc *Plugin
-}
-
-func (r *rpc) Publish(msg []*Message, ok *bool) error {
- const op = errors.Op("broadcast_publish")
- err := r.svc.Publish(msg)
- if err != nil {
- *ok = false
- return errors.E(op, err)
- }
- *ok = true
- return nil
-}
-
-func (r *rpc) PublishAsync() {
-
-}
diff --git a/plugins/broadcast/ws/commands/join.go b/plugins/broadcast/ws/commands/join.go
deleted file mode 100644
index 25943f0a..00000000
--- a/plugins/broadcast/ws/commands/join.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package commands
-
-// Join command to save the connection
-type Join struct {
- Command string `mapstructure:"command"`
-}
-
-func JoinCommand() {
-
-}
diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go
deleted file mode 100644
index cdff10da..00000000
--- a/plugins/broadcast/ws/commands/leave.go
+++ /dev/null
@@ -1 +0,0 @@
-package commands
diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go
deleted file mode 100644
index cdff10da..00000000
--- a/plugins/broadcast/ws/commands/subscribe.go
+++ /dev/null
@@ -1 +0,0 @@
-package commands
diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go
deleted file mode 100644
index 1d4132b4..00000000
--- a/plugins/broadcast/ws/config.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package ws
-
-/*
-broadcast:
- ws-us-region-1:
- subscriber: ws
- path: "/ws"
-
- driver: redis
- address:
- - 6379
- db: 0
-*/
-
-// Config represents configuration for the ws plugin
-type Config struct {
- // http path for the websocket
- Path string `mapstructure:"Path"`
-}
-
-// InitDefault initialize default values for the ws config
-func (c *Config) InitDefault() {
- if c.Path == "" {
- c.Path = "/ws"
- }
-}
diff --git a/plugins/broadcast/ws/connection/connection.go b/plugins/broadcast/ws/connection/connection.go
deleted file mode 100644
index cfb47e35..00000000
--- a/plugins/broadcast/ws/connection/connection.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package connection
-
-import (
- "sync"
-
- "github.com/gofiber/websocket/v2"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-// Connection represents wrapped and safe to use from the different threads websocket connection
-type Connection struct {
- sync.RWMutex
- log logger.Logger
- conn *websocket.Conn
-}
-
-func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection {
- return &Connection{
- conn: wsConn,
- log: log,
- }
-}
-
-func (c *Connection) Write(mt int, data []byte) error {
- c.Lock()
- defer c.Unlock()
-
- const op = errors.Op("websocket_write")
- // handle a case when a goroutine tried to write into the closed connection
- defer func() {
- if r := recover(); r != nil {
- c.log.Warn("panic handled, tried to write into the closed connection")
- }
- }()
-
- err := c.conn.WriteMessage(mt, data)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (c *Connection) Read() (int, []byte, error) {
- c.RLock()
- defer c.RUnlock()
- const op = errors.Op("websocket_read")
-
- mt, data, err := c.conn.ReadMessage()
- if err != nil {
- return -1, nil, errors.E(op, err)
- }
-
- return mt, data, nil
-}
-
-func (c *Connection) Close() error {
- c.Lock()
- defer c.Unlock()
- const op = errors.Op("websocket_close")
-
- err := c.conn.Close()
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go
deleted file mode 100644
index f075864b..00000000
--- a/plugins/broadcast/ws/plugin.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package ws
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- RootPluginName = "broadcast"
- PluginName = "websockets"
-)
-
-type Plugin struct {
- // logger
- log logger.Logger
- // configurer plugin
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("ws_plugin_init")
-
- // check for the configuration section existence
- if !cfg.Has(RootPluginName) {
- return errors.E(op, errors.Disabled, errors.Str("broadcast plugin section should exists in the configuration"))
- }
-
- p.cfg = cfg
- p.log = log
-
- return nil
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-// Provides Provide a ws implementation
-func (p *Plugin) Provides() []interface{} {
- return []interface{}{
- p.Websocket,
- }
-}
-
-// Websocket method should provide the Subscriber implementation to the broadcast
-func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) {
- const op = errors.Op("websocket_subscriber_provide")
- // initialize subscriber with the storage
- ws, err := NewWSSubscriber(storage)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return ws, nil
-}
-
-func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go
deleted file mode 100644
index 660efdca..00000000
--- a/plugins/broadcast/ws/subscriber.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package ws
-
-import (
- "github.com/gofiber/fiber/v2"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection"
-)
-
-type Subscriber struct {
- connections map[string]*connection.Connection
- storage broadcast.Storage
-}
-
-// config
-//
-func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) {
- m := make(map[string]*connection.Connection)
-
- go func() {
- app := fiber.New()
- app.Use("/ws", wsMiddleware)
- app.Listen(":8080")
- }()
-
- return &Subscriber{
- connections: m,
- storage: storage,
- }, nil
-}
-
-func (s *Subscriber) Subscribe(topics ...string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) SubscribePattern(pattern string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) Unsubscribe(topics ...string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) UnsubscribePattern(pattern string) error {
- panic("implement me")
-}
-
-func (s *Subscriber) Publish(messages ...*broadcast.Message) error {
- s.storage.GetConnection(messages[9].Topic)
- return nil
-}
diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go
deleted file mode 100644
index 068ef9fb..00000000
--- a/plugins/broadcast/ws/ws_middleware.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package ws
-
-import (
- "github.com/gofiber/fiber/v2"
- "github.com/gofiber/websocket/v2"
-)
-
-func wsMiddleware(c *fiber.Ctx) error {
- if websocket.IsWebSocketUpgrade(c) {
- return c.Next()
- }
- return fiber.ErrUpgradeRequired
-}