diff options
Diffstat (limited to 'plugins/broadcast')
24 files changed, 0 insertions, 922 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go deleted file mode 100644 index 5e7b7f20..00000000 --- a/plugins/broadcast/config.go +++ /dev/null @@ -1,29 +0,0 @@ -package broadcast - -/* -broadcast: - ws-us-region-1: - subscriber: websockets - middleware: ["headers", "gzip"] # ???? - address: "localhost:53223" - path: "/ws" - - storage: redis - address: - - 6379 - db: 0 -*/ - -// Config represents configuration for the ws plugin -type Config struct { - // Sections represent particular broadcast plugin section - Sections map[string]interface{} `mapstructure:"sections"` -} - -func (c *Config) InitDefaults() { - -} - -func (c *Config) Valid() error { - return nil -} diff --git a/plugins/broadcast/doc/.rr-broadcast.yaml b/plugins/broadcast/doc/.rr-broadcast.yaml deleted file mode 100644 index 8b0eef20..00000000 --- a/plugins/broadcast/doc/.rr-broadcast.yaml +++ /dev/null @@ -1,29 +0,0 @@ -broadcast: - # path to enable web-socket handler middleware - path: /ws - - # optional, redis broker configuration - redis: - addrs: - - "localhost:6379" - # if a MasterName is passed a sentinel-backed FailoverClient will be returned - master_name: "" - username: "" - password: "" - db: 0 - sentinel_password: "" - route_by_latency: false - route_randomly: false - dial_timeout: 0 # accepted values [1s, 5m, 3h] - max_retries: 1 - min_retry_backoff: 0 # accepted values [1s, 5m, 3h] - max_retry_backoff: 0 # accepted values [1s, 5m, 3h] - pool_size: 0 - min_idle_conns: 0 - max_conn_age: 0 # accepted values [1s, 5m, 3h] - read_timeout: 0 # accepted values [1s, 5m, 3h] - write_timeout: 0 # accepted values [1s, 5m, 3h] - pool_timeout: 0 # accepted values [1s, 5m, 3h] - idle_timeout: 0 # accepted values [1s, 5m, 3h] - idle_check_freq: 0 # accepted values [1s, 5m, 3h] - read_only: false diff --git a/plugins/broadcast/doc/broadcast.drawio b/plugins/broadcast/doc/broadcast.drawio deleted file mode 100644 index 2339f5b1..00000000 --- a/plugins/broadcast/doc/broadcast.drawio +++ /dev/null @@ -1,151 +0,0 @@ -<mxfile> - <diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1"> - <mxGraphModel dx="1582" dy="1094" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="1920" pageHeight="1200" math="0" shadow="0"> - <root> - <mxCell id="0"/> - <mxCell id="1" parent="0"/> - <mxCell id="y4MLTYBancT3lkQri0nA-9" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"> - <mxPoint x="620" y="440" as="targetPoint"/> - </mxGeometry> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-15" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="y4MLTYBancT3lkQri0nA-2" edge="1"> - <mxGeometry relative="1" as="geometry"> - <Array as="points"> - <mxPoint x="460" y="240"/> - <mxPoint x="270" y="240"/> - </Array> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-16" value="4" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="34_DfmtK1x9xla_BCLYV-17" edge="1"> - <mxGeometry x="0.0526" y="10" relative="1" as="geometry"> - <mxPoint x="459.72413793103465" y="510" as="targetPoint"/> - <mxPoint x="-10" y="10" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-1" value="BROADCAST HUB (PLUGIN)" style="whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="430" y="410" width="120" height="60" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-5" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-19" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="34_DfmtK1x9xla_BCLYV-1" edge="1"> - <mxGeometry relative="1" as="geometry"> - <Array as="points"> - <mxPoint x="240" y="160"/> - <mxPoint x="696" y="160"/> - </Array> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-22" value="6" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-19" vertex="1" connectable="0"> - <mxGeometry x="-0.2172" relative="1" as="geometry"> - <mxPoint x="108.62" y="-10" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-2" value="WS" style="whiteSpace=wrap;html=1;fillColor=#f5f5f5;strokeColor=#666666;fontColor=#333333;" parent="1" vertex="1"> - <mxGeometry x="210" y="290" width="120" height="40" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-23" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" edge="1"> - <mxGeometry relative="1" as="geometry"> - <mxPoint x="50" y="440.2068965517242" as="targetPoint"/> - </mxGeometry> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-3" value="REDIS" style="whiteSpace=wrap;html=1;fillColor=#fff2cc;strokeColor=#d6b656;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="210" y="420" width="120" height="40" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-7" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-4" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-4" value="IN-MEMORY" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="210" y="560" width="120" height="40" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-8" value="Collects sub-plugins" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1"> - <mxGeometry x="310" y="370" width="120" height="20" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-15" value="3" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.75;exitDx=0;exitDy=0;entryX=1;entryY=0.75;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-10" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry x="0.1429" y="15" relative="1" as="geometry"> - <mxPoint as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-10" value="RPC BUS" style="whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=#9673a6;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="620" y="410" width="120" height="60" as="geometry"/> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;entryX=0.75;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="CDYlmZ7dxupAKddLQDts-1" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-8" value="Connect to the GO endpoint (from the config)" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="CDYlmZ7dxupAKddLQDts-6" vertex="1" connectable="0"> - <mxGeometry x="-0.6679" y="-2" relative="1" as="geometry"> - <mxPoint x="-33.3" y="-13" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-1" value="" style="aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/azure2/general/Browser.svg;" parent="1" vertex="1"> - <mxGeometry x="652.5" y="30" width="87.5" height="70" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-12" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="y4MLTYBancT3lkQri0nA-10" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-20" value="2" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-12" vertex="1" connectable="0"> - <mxGeometry x="0.2355" y="-1" relative="1" as="geometry"> - <mxPoint x="9" y="12.69" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-3" value="PUBLISH" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> - <mxGeometry x="636.25" y="568" width="87.5" height="30" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-11" value="1" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-5" target="CDYlmZ7dxupAKddLQDts-3" edge="1"> - <mxGeometry x="0.2" y="-10" relative="1" as="geometry"> - <Array as="points"> - <mxPoint x="680" y="620"/> - <mxPoint x="680" y="620"/> - </Array> - <mxPoint as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-5" value="PHP" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> - <mxGeometry x="620" y="628" width="120" height="30" as="geometry"/> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-9" value="Save the websocket connection" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1"> - <mxGeometry x="280" y="220" width="180" height="20" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-1" value="BROKER?? -&gt; Subscriber" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> - <mxGeometry x="653" y="100" width="87" height="30" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-9" value="<p style="margin: 0px ; margin-top: 4px ; text-align: center ; text-decoration: underline"><b>Structure</b></p><hr><p style="margin: 0px ; margin-left: 8px">Topic = string<br>Payload = raw<br></p><p style="margin: 0px ; margin-left: 8px">Broker = ws (from config) (hardcoded)<br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=12;fontFamily=Helvetica;html=1;" parent="1" vertex="1"> - <mxGeometry x="780" y="543" width="220" height="80" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-10" value="" style="endArrow=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="34_DfmtK1x9xla_BCLYV-9" edge="1"> - <mxGeometry width="50" height="50" relative="1" as="geometry"> - <mxPoint x="740" y="608" as="sourcePoint"/> - <mxPoint x="790" y="558" as="targetPoint"/> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-14" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-13" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-13" value="AMQP" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="210" y="658" width="120" height="40" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-18" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-17" target="y4MLTYBancT3lkQri0nA-2" edge="1"> - <mxGeometry relative="1" as="geometry"> - <Array as="points"> - <mxPoint x="160" y="520"/> - <mxPoint x="160" y="380"/> - <mxPoint x="270" y="380"/> - </Array> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-21" value="5" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-18" vertex="1" connectable="0"> - <mxGeometry x="-0.317" y="3" relative="1" as="geometry"> - <mxPoint x="-7" y="-10.34" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-17" value="<ul><li>Check registered topics</li><li>Redirect to driver</li></ul>" style="whiteSpace=wrap;html=1;align=left;" parent="1" vertex="1"> - <mxGeometry x="240" y="500" width="175" height="40" as="geometry"/> - </mxCell> - </root> - </mxGraphModel> - </diagram> -</mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/doc/ws.drawio b/plugins/broadcast/doc/ws.drawio deleted file mode 100644 index 739b797a..00000000 --- a/plugins/broadcast/doc/ws.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go deleted file mode 100644 index 47c779b5..00000000 --- a/plugins/broadcast/interface.go +++ /dev/null @@ -1,41 +0,0 @@ -package broadcast - -import ( - "encoding/json" -) - -// Subscriber defines the ability to operate as message passing broker. -type Subscriber interface { - // Subscribe broker to one or multiple topics. - Subscribe(topics ...string) error - // UnsubscribePattern broker from pattern. - UnsubscribePattern(pattern string) error -} - -// Storage used to store patterns and topics -type Storage interface { - // Store connection uuid associated with the provided topics - Store(uuid string, topics ...string) - // StorePattern stores pattern associated with the particular connection - StorePattern(uuid string, pattern string) - - // GetConnection returns connections for the particular pattern - GetConnection(pattern string) []string - - // Construct is a constructor for the storage according to the provided configuration key (broadcast.websocket for example) - Construct(key string) (Storage, error) -} - -type Publisher interface { - // Publish one or multiple Channel. - Publish(messages ...*Message) error -} - -// Message represent single message. -type Message struct { - // Topic message been pushed into. - Topic string `json:"topic"` - - // Payload to be broadcasted. Must be valid json when transferred over RPC. - Payload json.RawMessage `json:"payload"` -} diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/broadcast/memory/bst/bst.go deleted file mode 100644 index 7d09a10f..00000000 --- a/plugins/broadcast/memory/bst/bst.go +++ /dev/null @@ -1,134 +0,0 @@ -package bst - -// BST ... -type BST struct { - // registered topic, not unique - topic string - // associated connections with the topic - uuids map[string]struct{} - - // left and right subtrees - left *BST - right *BST -} - -func NewBST() Storage { - return &BST{} -} - -// Insert uuid to the topic -func (B *BST) Insert(uuid string, topic string) { - curr := B - - for { - if curr.topic == topic { - curr.uuids[uuid] = struct{}{} - return - } - // if topic less than curr topic - if curr.topic < topic { - if curr.left == nil { - curr.left = &BST{ - topic: topic, - uuids: map[string]struct{}{uuid: {}}, - } - return - } - // move forward - curr = curr.left - } else { - if curr.right == nil { - curr.right = &BST{ - topic: topic, - uuids: map[string]struct{}{uuid: {}}, - } - return - } - - curr = curr.right - } - } -} - -func (B *BST) Get(topic string) map[string]struct{} { - curr := B - for curr != nil { - if curr.topic == topic { - return curr.uuids - } - if curr.topic < topic { - curr = curr.left - } - if curr.topic > topic { - curr = curr.right - } - } - - return nil -} - -func (B *BST) Remove(uuid string, topic string) { - B.removeHelper(uuid, topic, nil) -} - -func (B *BST) removeHelper(uuid string, topic string, parent *BST) { - curr := B - for curr != nil { - if topic < curr.topic { - parent = curr - curr = curr.left - } else if topic > curr.topic { - parent = curr - curr = curr.right - } else { - if len(curr.uuids) > 1 { - if _, ok := curr.uuids[uuid]; ok { - delete(curr.uuids, uuid) - return - } - } - - if curr.left != nil && curr.right != nil { - curr.topic, curr.uuids = curr.right.traverseForMinString() - curr.right.removeHelper(curr.topic, uuid, curr) - } else if parent == nil { - if curr.left != nil { - curr.topic = curr.left.topic - curr.uuids = curr.left.uuids - - curr.right = curr.left.right - curr.left = curr.left.left - } else if curr.right != nil { - curr.topic = curr.right.topic - curr.uuids = curr.right.uuids - - curr.left = curr.right.left - curr.right = curr.right.right - } else { - // single node tree - } - } else if parent.left == curr { - if curr.left != nil { - parent.left = curr.left - } else { - parent.left = curr.right - } - } else if parent.right == curr { - if curr.left != nil { - parent.right = curr.left - } else { - parent.right = curr.right - } - } - break - } - } -} - -//go:inline -func (B *BST) traverseForMinString() (string, map[string]struct{}) { - if B.left == nil { - return B.topic, B.uuids - } - return B.left.traverseForMinString() -} diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/broadcast/memory/bst/bst_test.go deleted file mode 100644 index b5ad6c10..00000000 --- a/plugins/broadcast/memory/bst/bst_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package bst - -import ( - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" -) - -func TestNewBST(t *testing.T) { - g := NewBST() - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments") - } - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments2") - } - - for i := 0; i < 100; i++ { - g.Insert(uuid.NewString(), "comments3") - } - - exist := g.Get("comments") - assert.Len(t, exist, 100) - - exist2 := g.Get("comments2") - assert.Len(t, exist2, 100) - - exist3 := g.Get("comments3") - assert.Len(t, exist3, 100) -} diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/broadcast/memory/bst/interface.go deleted file mode 100644 index ecf40414..00000000 --- a/plugins/broadcast/memory/bst/interface.go +++ /dev/null @@ -1,11 +0,0 @@ -package bst - -// Storage is general in-memory BST storage implementation -type Storage interface { - // Insert inserts to a vertex with topic ident connection uuid - Insert(uuid string, topic string) - // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed - Remove(uuid, topic string) - // Get will return all connections associated with the topic - Get(topic string) map[string]struct{} -} diff --git a/plugins/broadcast/memory/config.go b/plugins/broadcast/memory/config.go deleted file mode 100644 index e80695bc..00000000 --- a/plugins/broadcast/memory/config.go +++ /dev/null @@ -1,6 +0,0 @@ -package memory - -// Config for the memory driver is empty, it's just a placeholder -type Config struct{} - -func (c *Config) InitDefaults() {} diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go deleted file mode 100644 index 80527e4b..00000000 --- a/plugins/broadcast/memory/driver.go +++ /dev/null @@ -1,29 +0,0 @@ -package memory - -import ( - "github.com/spiral/roadrunner/v2/plugins/broadcast" -) - -type Driver struct { -} - -func NewInMemoryDriver() broadcast.Storage { - b := &Driver{} - return b -} - -func (d *Driver) Store(uuid string, topics ...string) { - panic("implement me") -} - -func (d *Driver) StorePattern(uuid string, pattern string) { - panic("implement me") -} - -func (d *Driver) GetConnection(pattern string) []string { - panic("implement me") -} - -func (d *Driver) Construct(key string) (broadcast.Storage, error) { - return nil, nil -} diff --git a/plugins/broadcast/memory/plugin.go b/plugins/broadcast/memory/plugin.go deleted file mode 100644 index 2bd894a0..00000000 --- a/plugins/broadcast/memory/plugin.go +++ /dev/null @@ -1,67 +0,0 @@ -package memory - -import ( - "fmt" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "broadcast" - SectionName string = "memory" -) - -type Plugin struct { - log logger.Logger - cfg *Config -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("memory_plugin_init") - - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, errors.Disabled, err) - } - - p.cfg.InitDefaults() - - p.log = log - return nil -} - -func (p *Plugin) Serve() chan error { - const op = errors.Op("memory_plugin_serve") - errCh := make(chan error) - - return errCh -} - -func (p *Plugin) Stop() error { - - return nil -} - -// Available interface implementation for the plugin -func (p *Plugin) Available() {} - -// Name is endure.Named interface implementation -func (p *Plugin) Name() string { - // broadcast.memory - return fmt.Sprintf("%s.%s", PluginName, SectionName) -} - -func (p *Plugin) Publish(msg []*broadcast.Message) error { - return nil -} diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go deleted file mode 100644 index 156bea80..00000000 --- a/plugins/broadcast/plugin.go +++ /dev/null @@ -1,105 +0,0 @@ -package broadcast - -import ( - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "broadcast" -) - -type Plugin struct { - broker Subscriber - driver Storage - - log logger.Logger - cfg *Config -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("broadcast_plugin_init") - - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, errors.Disabled, err) - } - - p.cfg.InitDefaults() - - p.log = log - return nil -} - -func (p *Plugin) Serve() chan error { - const op = errors.Op("broadcast_plugin_serve") - errCh := make(chan error) - - // if there are no brokers, return nil - if p.broker == nil { - errCh <- errors.E(op, errors.Str("no broker detected")) - return errCh - } - - if p.driver == nil { - // Or if no storage detected, use in-memory storage - errCh <- errors.E(op, errors.Str("no storage detected")) - return errCh - } - - // start the underlying broker - go func() { - // err := p.broker.Serve() - // if err != nil { - // errCh <- errors.E(op, err) - // } - }() - - return errCh -} - -func (p *Plugin) Stop() error { - return nil -} - -// Available interface implementation for the plugin -func (p *Plugin) Available() {} - -// Name is endure.Named interface implementation -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.CollectSubscriber, - } -} - -func (p *Plugin) CollectSubscriber(name endure.Named, subscriber Subscriber) { - p.broker = subscriber -} - -func (p *Plugin) CollectStorage(name endure.Named, storage Storage) { - p.driver = storage -} - -func (p *Plugin) RPC() interface{} { - // create an RPC service for the collects - r := &rpc{ - log: p.log, - svc: p, - } - return r -} - -func (p *Plugin) Publish(msg []*Message) error { - const op = errors.Op("broadcast_plugin_publish") - return nil -} diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go deleted file mode 100644 index 556d5f03..00000000 --- a/plugins/broadcast/redis/driver.go +++ /dev/null @@ -1,29 +0,0 @@ -package redis - -import ( - "github.com/spiral/roadrunner/v2/plugins/broadcast" -) - -type Driver struct { -} - -func NewInMemoryDriver() broadcast.Storage { - b := &Driver{} - return b -} - -func (d *Driver) Store(uuid string, topics ...string) { - panic("implement me") -} - -func (d *Driver) StorePattern(uuid string, pattern string) { - panic("implement me") -} - -func (d *Driver) GetConnection(pattern string) []string { - panic("implement me") -} - -func (d *Driver) Construct(key string) (broadcast.Storage, error) { - panic("implement me") -} diff --git a/plugins/broadcast/redis/plugin.go b/plugins/broadcast/redis/plugin.go deleted file mode 100644 index 65a229e1..00000000 --- a/plugins/broadcast/redis/plugin.go +++ /dev/null @@ -1 +0,0 @@ -package redis diff --git a/plugins/broadcast/redis/storage.go b/plugins/broadcast/redis/storage.go deleted file mode 100644 index 65a229e1..00000000 --- a/plugins/broadcast/redis/storage.go +++ /dev/null @@ -1 +0,0 @@ -package redis diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go deleted file mode 100644 index 948fd7ae..00000000 --- a/plugins/broadcast/rpc.go +++ /dev/null @@ -1,26 +0,0 @@ -package broadcast - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type rpc struct { - log logger.Logger - svc *Plugin -} - -func (r *rpc) Publish(msg []*Message, ok *bool) error { - const op = errors.Op("broadcast_publish") - err := r.svc.Publish(msg) - if err != nil { - *ok = false - return errors.E(op, err) - } - *ok = true - return nil -} - -func (r *rpc) PublishAsync() { - -} diff --git a/plugins/broadcast/ws/commands/join.go b/plugins/broadcast/ws/commands/join.go deleted file mode 100644 index 25943f0a..00000000 --- a/plugins/broadcast/ws/commands/join.go +++ /dev/null @@ -1,10 +0,0 @@ -package commands - -// Join command to save the connection -type Join struct { - Command string `mapstructure:"command"` -} - -func JoinCommand() { - -} diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go deleted file mode 100644 index cdff10da..00000000 --- a/plugins/broadcast/ws/commands/leave.go +++ /dev/null @@ -1 +0,0 @@ -package commands diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go deleted file mode 100644 index cdff10da..00000000 --- a/plugins/broadcast/ws/commands/subscribe.go +++ /dev/null @@ -1 +0,0 @@ -package commands diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go deleted file mode 100644 index 1d4132b4..00000000 --- a/plugins/broadcast/ws/config.go +++ /dev/null @@ -1,26 +0,0 @@ -package ws - -/* -broadcast: - ws-us-region-1: - subscriber: ws - path: "/ws" - - driver: redis - address: - - 6379 - db: 0 -*/ - -// Config represents configuration for the ws plugin -type Config struct { - // http path for the websocket - Path string `mapstructure:"Path"` -} - -// InitDefault initialize default values for the ws config -func (c *Config) InitDefault() { - if c.Path == "" { - c.Path = "/ws" - } -} diff --git a/plugins/broadcast/ws/connection/connection.go b/plugins/broadcast/ws/connection/connection.go deleted file mode 100644 index cfb47e35..00000000 --- a/plugins/broadcast/ws/connection/connection.go +++ /dev/null @@ -1,69 +0,0 @@ -package connection - -import ( - "sync" - - "github.com/gofiber/websocket/v2" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -// Connection represents wrapped and safe to use from the different threads websocket connection -type Connection struct { - sync.RWMutex - log logger.Logger - conn *websocket.Conn -} - -func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection { - return &Connection{ - conn: wsConn, - log: log, - } -} - -func (c *Connection) Write(mt int, data []byte) error { - c.Lock() - defer c.Unlock() - - const op = errors.Op("websocket_write") - // handle a case when a goroutine tried to write into the closed connection - defer func() { - if r := recover(); r != nil { - c.log.Warn("panic handled, tried to write into the closed connection") - } - }() - - err := c.conn.WriteMessage(mt, data) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *Connection) Read() (int, []byte, error) { - c.RLock() - defer c.RUnlock() - const op = errors.Op("websocket_read") - - mt, data, err := c.conn.ReadMessage() - if err != nil { - return -1, nil, errors.E(op, err) - } - - return mt, data, nil -} - -func (c *Connection) Close() error { - c.Lock() - defer c.Unlock() - const op = errors.Op("websocket_close") - - err := c.conn.Close() - if err != nil { - return errors.E(op, err) - } - - return nil -} diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go deleted file mode 100644 index f075864b..00000000 --- a/plugins/broadcast/ws/plugin.go +++ /dev/null @@ -1,59 +0,0 @@ -package ws - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - RootPluginName = "broadcast" - PluginName = "websockets" -) - -type Plugin struct { - // logger - log logger.Logger - // configurer plugin - cfg config.Configurer -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("ws_plugin_init") - - // check for the configuration section existence - if !cfg.Has(RootPluginName) { - return errors.E(op, errors.Disabled, errors.Str("broadcast plugin section should exists in the configuration")) - } - - p.cfg = cfg - p.log = log - - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -// Provides Provide a ws implementation -func (p *Plugin) Provides() []interface{} { - return []interface{}{ - p.Websocket, - } -} - -// Websocket method should provide the Subscriber implementation to the broadcast -func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) { - const op = errors.Op("websocket_subscriber_provide") - // initialize subscriber with the storage - ws, err := NewWSSubscriber(storage) - if err != nil { - return nil, errors.E(op, err) - } - - return ws, nil -} - -func (p *Plugin) Available() {} diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go deleted file mode 100644 index 660efdca..00000000 --- a/plugins/broadcast/ws/subscriber.go +++ /dev/null @@ -1,50 +0,0 @@ -package ws - -import ( - "github.com/gofiber/fiber/v2" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection" -) - -type Subscriber struct { - connections map[string]*connection.Connection - storage broadcast.Storage -} - -// config -// -func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) { - m := make(map[string]*connection.Connection) - - go func() { - app := fiber.New() - app.Use("/ws", wsMiddleware) - app.Listen(":8080") - }() - - return &Subscriber{ - connections: m, - storage: storage, - }, nil -} - -func (s *Subscriber) Subscribe(topics ...string) error { - panic("implement me") -} - -func (s *Subscriber) SubscribePattern(pattern string) error { - panic("implement me") -} - -func (s *Subscriber) Unsubscribe(topics ...string) error { - panic("implement me") -} - -func (s *Subscriber) UnsubscribePattern(pattern string) error { - panic("implement me") -} - -func (s *Subscriber) Publish(messages ...*broadcast.Message) error { - s.storage.GetConnection(messages[9].Topic) - return nil -} diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go deleted file mode 100644 index 068ef9fb..00000000 --- a/plugins/broadcast/ws/ws_middleware.go +++ /dev/null @@ -1,13 +0,0 @@ -package ws - -import ( - "github.com/gofiber/fiber/v2" - "github.com/gofiber/websocket/v2" -) - -func wsMiddleware(c *fiber.Ctx) error { - if websocket.IsWebSocketUpgrade(c) { - return c.Next() - } - return fiber.ErrUpgradeRequired -} |