diff options
author | Valery Piashchynski <[email protected]> | 2021-05-27 00:09:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-27 00:09:33 +0300 |
commit | dc3c5455e5c9b32737a0620c8bdb8bda0226dba7 (patch) | |
tree | 6ba562da6de7f32a8d528b72cbb56a8bc98c1b30 | |
parent | d2e9d8320857f5768c54843a43ad16f59d6a3e8f (diff) |
- Update all main abstractions
- Desighn a new interfaces responsible for the whole PubSub
- New plugin - websockets
Signed-off-by: Valery Piashchynski <[email protected]>
47 files changed, 1302 insertions, 678 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index 4ef2f2e7..c22fbbd3 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -18,7 +18,7 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) - // Remove worker from the pool. + // RemoveWorker removes worker from the pool. RemoveWorker(worker worker.BaseProcess) error // Destroy all underlying stack (but let them to complete the task). diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go new file mode 100644 index 00000000..80dab0c3 --- /dev/null +++ b/pkg/pubsub/interface.go @@ -0,0 +1,38 @@ +package pubsub + +// PubSub ... +type PubSub interface { + Publisher + Subscriber + Reader +} + +// Subscriber defines the ability to operate as message passing broker. +type Subscriber interface { + // Subscribe broker to one or multiple topics. + Subscribe(topics ...string) error + // Unsubscribe from one or multiply topics + Unsubscribe(topics ...string) error +} + +// Publisher publish one or more messages +type Publisher interface { + // Publish one or multiple Channel. + Publish(messages []Message) error + + // PublishAsync publish message and return immediately + // If error occurred it will be printed into the logger + PublishAsync(messages []Message) +} + +// Reader interface should return next message +type Reader interface { + Next() (Message, error) +} + +type Message interface { + Command() string + Payload() []byte + Topics() []string + Broker() string +} diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go new file mode 100644 index 00000000..ab74eb98 --- /dev/null +++ b/pkg/pubsub/message.go @@ -0,0 +1,64 @@ +package pubsub + +import ( + json "github.com/json-iterator/go" +) + +type Msg struct { + // Topic message been pushed into. + T []string `json:"topic"` + + // Command (join, leave, headers) + C string `json:"command"` + + // Broker (redis, memory) + B string `json:"broker"` + + // Payload to be broadcasted + P []byte `json:"payload"` +} + +//func (m Msg) UnmarshalBinary(data []byte) error { +// //Use default gob decoder +// reader := bytes.NewReader(data) +// dec := gob.NewDecoder(reader) +// if err := dec.Decode(&m); err != nil { +// return err +// } +// +// return nil +//} + +func (m *Msg) MarshalBinary() ([]byte, error) { + //buf := new(bytes.Buffer) + // + //for i := 0; i < len(m.T); i++ { + // buf.WriteString(m.T[i]) + //} + // + //buf.WriteString(m.C) + //buf.WriteString(m.B) + //buf.Write(m.P) + + return json.Marshal(m) + +} + +// Payload in raw bytes +func (m *Msg) Payload() []byte { + return m.P +} + +// Command for the connection +func (m *Msg) Command() string { + return m.C +} + +// Topics to subscribe +func (m *Msg) Topics() []string { + return m.T +} + +func (m *Msg) Broker() string { + return m.B +} diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go deleted file mode 100644 index 5e7b7f20..00000000 --- a/plugins/broadcast/config.go +++ /dev/null @@ -1,29 +0,0 @@ -package broadcast - -/* -broadcast: - ws-us-region-1: - subscriber: websockets - middleware: ["headers", "gzip"] # ???? - address: "localhost:53223" - path: "/ws" - - storage: redis - address: - - 6379 - db: 0 -*/ - -// Config represents configuration for the ws plugin -type Config struct { - // Sections represent particular broadcast plugin section - Sections map[string]interface{} `mapstructure:"sections"` -} - -func (c *Config) InitDefaults() { - -} - -func (c *Config) Valid() error { - return nil -} diff --git a/plugins/broadcast/doc/.rr-broadcast.yaml b/plugins/broadcast/doc/.rr-broadcast.yaml deleted file mode 100644 index 8b0eef20..00000000 --- a/plugins/broadcast/doc/.rr-broadcast.yaml +++ /dev/null @@ -1,29 +0,0 @@ -broadcast: - # path to enable web-socket handler middleware - path: /ws - - # optional, redis broker configuration - redis: - addrs: - - "localhost:6379" - # if a MasterName is passed a sentinel-backed FailoverClient will be returned - master_name: "" - username: "" - password: "" - db: 0 - sentinel_password: "" - route_by_latency: false - route_randomly: false - dial_timeout: 0 # accepted values [1s, 5m, 3h] - max_retries: 1 - min_retry_backoff: 0 # accepted values [1s, 5m, 3h] - max_retry_backoff: 0 # accepted values [1s, 5m, 3h] - pool_size: 0 - min_idle_conns: 0 - max_conn_age: 0 # accepted values [1s, 5m, 3h] - read_timeout: 0 # accepted values [1s, 5m, 3h] - write_timeout: 0 # accepted values [1s, 5m, 3h] - pool_timeout: 0 # accepted values [1s, 5m, 3h] - idle_timeout: 0 # accepted values [1s, 5m, 3h] - idle_check_freq: 0 # accepted values [1s, 5m, 3h] - read_only: false diff --git a/plugins/broadcast/doc/broadcast.drawio b/plugins/broadcast/doc/broadcast.drawio deleted file mode 100644 index 2339f5b1..00000000 --- a/plugins/broadcast/doc/broadcast.drawio +++ /dev/null @@ -1,151 +0,0 @@ -<mxfile> - <diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1"> - <mxGraphModel dx="1582" dy="1094" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="1920" pageHeight="1200" math="0" shadow="0"> - <root> - <mxCell id="0"/> - <mxCell id="1" parent="0"/> - <mxCell id="y4MLTYBancT3lkQri0nA-9" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"> - <mxPoint x="620" y="440" as="targetPoint"/> - </mxGeometry> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-15" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="y4MLTYBancT3lkQri0nA-2" edge="1"> - <mxGeometry relative="1" as="geometry"> - <Array as="points"> - <mxPoint x="460" y="240"/> - <mxPoint x="270" y="240"/> - </Array> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-16" value="4" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="34_DfmtK1x9xla_BCLYV-17" edge="1"> - <mxGeometry x="0.0526" y="10" relative="1" as="geometry"> - <mxPoint x="459.72413793103465" y="510" as="targetPoint"/> - <mxPoint x="-10" y="10" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-1" value="BROADCAST HUB (PLUGIN)" style="whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="430" y="410" width="120" height="60" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-5" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-19" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="34_DfmtK1x9xla_BCLYV-1" edge="1"> - <mxGeometry relative="1" as="geometry"> - <Array as="points"> - <mxPoint x="240" y="160"/> - <mxPoint x="696" y="160"/> - </Array> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-22" value="6" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-19" vertex="1" connectable="0"> - <mxGeometry x="-0.2172" relative="1" as="geometry"> - <mxPoint x="108.62" y="-10" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-2" value="WS" style="whiteSpace=wrap;html=1;fillColor=#f5f5f5;strokeColor=#666666;fontColor=#333333;" parent="1" vertex="1"> - <mxGeometry x="210" y="290" width="120" height="40" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-23" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" edge="1"> - <mxGeometry relative="1" as="geometry"> - <mxPoint x="50" y="440.2068965517242" as="targetPoint"/> - </mxGeometry> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-3" value="REDIS" style="whiteSpace=wrap;html=1;fillColor=#fff2cc;strokeColor=#d6b656;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="210" y="420" width="120" height="40" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-7" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-4" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-4" value="IN-MEMORY" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="210" y="560" width="120" height="40" as="geometry"/> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-8" value="Collects sub-plugins" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1"> - <mxGeometry x="310" y="370" width="120" height="20" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-15" value="3" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.75;exitDx=0;exitDy=0;entryX=1;entryY=0.75;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-10" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry x="0.1429" y="15" relative="1" as="geometry"> - <mxPoint as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="y4MLTYBancT3lkQri0nA-10" value="RPC BUS" style="whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=#9673a6;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="620" y="410" width="120" height="60" as="geometry"/> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;entryX=0.75;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="CDYlmZ7dxupAKddLQDts-1" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-8" value="Connect to the GO endpoint (from the config)" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="CDYlmZ7dxupAKddLQDts-6" vertex="1" connectable="0"> - <mxGeometry x="-0.6679" y="-2" relative="1" as="geometry"> - <mxPoint x="-33.3" y="-13" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-1" value="" style="aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/azure2/general/Browser.svg;" parent="1" vertex="1"> - <mxGeometry x="652.5" y="30" width="87.5" height="70" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-12" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="y4MLTYBancT3lkQri0nA-10" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-20" value="2" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-12" vertex="1" connectable="0"> - <mxGeometry x="0.2355" y="-1" relative="1" as="geometry"> - <mxPoint x="9" y="12.69" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-3" value="PUBLISH" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> - <mxGeometry x="636.25" y="568" width="87.5" height="30" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-11" value="1" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-5" target="CDYlmZ7dxupAKddLQDts-3" edge="1"> - <mxGeometry x="0.2" y="-10" relative="1" as="geometry"> - <Array as="points"> - <mxPoint x="680" y="620"/> - <mxPoint x="680" y="620"/> - </Array> - <mxPoint as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-5" value="PHP" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> - <mxGeometry x="620" y="628" width="120" height="30" as="geometry"/> - </mxCell> - <mxCell id="CDYlmZ7dxupAKddLQDts-9" value="Save the websocket connection" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1"> - <mxGeometry x="280" y="220" width="180" height="20" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-1" value="BROKER?? -&gt; Subscriber" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> - <mxGeometry x="653" y="100" width="87" height="30" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-9" value="<p style="margin: 0px ; margin-top: 4px ; text-align: center ; text-decoration: underline"><b>Structure</b></p><hr><p style="margin: 0px ; margin-left: 8px">Topic = string<br>Payload = raw<br></p><p style="margin: 0px ; margin-left: 8px">Broker = ws (from config) (hardcoded)<br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=12;fontFamily=Helvetica;html=1;" parent="1" vertex="1"> - <mxGeometry x="780" y="543" width="220" height="80" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-10" value="" style="endArrow=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="34_DfmtK1x9xla_BCLYV-9" edge="1"> - <mxGeometry width="50" height="50" relative="1" as="geometry"> - <mxPoint x="740" y="608" as="sourcePoint"/> - <mxPoint x="790" y="558" as="targetPoint"/> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-14" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-13" target="y4MLTYBancT3lkQri0nA-1" edge="1"> - <mxGeometry relative="1" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-13" value="AMQP" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1"> - <mxGeometry x="210" y="658" width="120" height="40" as="geometry"/> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-18" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-17" target="y4MLTYBancT3lkQri0nA-2" edge="1"> - <mxGeometry relative="1" as="geometry"> - <Array as="points"> - <mxPoint x="160" y="520"/> - <mxPoint x="160" y="380"/> - <mxPoint x="270" y="380"/> - </Array> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-21" value="5" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-18" vertex="1" connectable="0"> - <mxGeometry x="-0.317" y="3" relative="1" as="geometry"> - <mxPoint x="-7" y="-10.34" as="offset"/> - </mxGeometry> - </mxCell> - <mxCell id="34_DfmtK1x9xla_BCLYV-17" value="<ul><li>Check registered topics</li><li>Redirect to driver</li></ul>" style="whiteSpace=wrap;html=1;align=left;" parent="1" vertex="1"> - <mxGeometry x="240" y="500" width="175" height="40" as="geometry"/> - </mxCell> - </root> - </mxGraphModel> - </diagram> -</mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go deleted file mode 100644 index 47c779b5..00000000 --- a/plugins/broadcast/interface.go +++ /dev/null @@ -1,41 +0,0 @@ -package broadcast - -import ( - "encoding/json" -) - -// Subscriber defines the ability to operate as message passing broker. -type Subscriber interface { - // Subscribe broker to one or multiple topics. - Subscribe(topics ...string) error - // UnsubscribePattern broker from pattern. - UnsubscribePattern(pattern string) error -} - -// Storage used to store patterns and topics -type Storage interface { - // Store connection uuid associated with the provided topics - Store(uuid string, topics ...string) - // StorePattern stores pattern associated with the particular connection - StorePattern(uuid string, pattern string) - - // GetConnection returns connections for the particular pattern - GetConnection(pattern string) []string - - // Construct is a constructor for the storage according to the provided configuration key (broadcast.websocket for example) - Construct(key string) (Storage, error) -} - -type Publisher interface { - // Publish one or multiple Channel. - Publish(messages ...*Message) error -} - -// Message represent single message. -type Message struct { - // Topic message been pushed into. - Topic string `json:"topic"` - - // Payload to be broadcasted. Must be valid json when transferred over RPC. - Payload json.RawMessage `json:"payload"` -} diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go deleted file mode 100644 index 80527e4b..00000000 --- a/plugins/broadcast/memory/driver.go +++ /dev/null @@ -1,29 +0,0 @@ -package memory - -import ( - "github.com/spiral/roadrunner/v2/plugins/broadcast" -) - -type Driver struct { -} - -func NewInMemoryDriver() broadcast.Storage { - b := &Driver{} - return b -} - -func (d *Driver) Store(uuid string, topics ...string) { - panic("implement me") -} - -func (d *Driver) StorePattern(uuid string, pattern string) { - panic("implement me") -} - -func (d *Driver) GetConnection(pattern string) []string { - panic("implement me") -} - -func (d *Driver) Construct(key string) (broadcast.Storage, error) { - return nil, nil -} diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go deleted file mode 100644 index 156bea80..00000000 --- a/plugins/broadcast/plugin.go +++ /dev/null @@ -1,105 +0,0 @@ -package broadcast - -import ( - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "broadcast" -) - -type Plugin struct { - broker Subscriber - driver Storage - - log logger.Logger - cfg *Config -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("broadcast_plugin_init") - - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, errors.Disabled, err) - } - - p.cfg.InitDefaults() - - p.log = log - return nil -} - -func (p *Plugin) Serve() chan error { - const op = errors.Op("broadcast_plugin_serve") - errCh := make(chan error) - - // if there are no brokers, return nil - if p.broker == nil { - errCh <- errors.E(op, errors.Str("no broker detected")) - return errCh - } - - if p.driver == nil { - // Or if no storage detected, use in-memory storage - errCh <- errors.E(op, errors.Str("no storage detected")) - return errCh - } - - // start the underlying broker - go func() { - // err := p.broker.Serve() - // if err != nil { - // errCh <- errors.E(op, err) - // } - }() - - return errCh -} - -func (p *Plugin) Stop() error { - return nil -} - -// Available interface implementation for the plugin -func (p *Plugin) Available() {} - -// Name is endure.Named interface implementation -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.CollectSubscriber, - } -} - -func (p *Plugin) CollectSubscriber(name endure.Named, subscriber Subscriber) { - p.broker = subscriber -} - -func (p *Plugin) CollectStorage(name endure.Named, storage Storage) { - p.driver = storage -} - -func (p *Plugin) RPC() interface{} { - // create an RPC service for the collects - r := &rpc{ - log: p.log, - svc: p, - } - return r -} - -func (p *Plugin) Publish(msg []*Message) error { - const op = errors.Op("broadcast_plugin_publish") - return nil -} diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go deleted file mode 100644 index 556d5f03..00000000 --- a/plugins/broadcast/redis/driver.go +++ /dev/null @@ -1,29 +0,0 @@ -package redis - -import ( - "github.com/spiral/roadrunner/v2/plugins/broadcast" -) - -type Driver struct { -} - -func NewInMemoryDriver() broadcast.Storage { - b := &Driver{} - return b -} - -func (d *Driver) Store(uuid string, topics ...string) { - panic("implement me") -} - -func (d *Driver) StorePattern(uuid string, pattern string) { - panic("implement me") -} - -func (d *Driver) GetConnection(pattern string) []string { - panic("implement me") -} - -func (d *Driver) Construct(key string) (broadcast.Storage, error) { - panic("implement me") -} diff --git a/plugins/broadcast/redis/plugin.go b/plugins/broadcast/redis/plugin.go deleted file mode 100644 index 65a229e1..00000000 --- a/plugins/broadcast/redis/plugin.go +++ /dev/null @@ -1 +0,0 @@ -package redis diff --git a/plugins/broadcast/redis/storage.go b/plugins/broadcast/redis/storage.go deleted file mode 100644 index 65a229e1..00000000 --- a/plugins/broadcast/redis/storage.go +++ /dev/null @@ -1 +0,0 @@ -package redis diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go deleted file mode 100644 index 948fd7ae..00000000 --- a/plugins/broadcast/rpc.go +++ /dev/null @@ -1,26 +0,0 @@ -package broadcast - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type rpc struct { - log logger.Logger - svc *Plugin -} - -func (r *rpc) Publish(msg []*Message, ok *bool) error { - const op = errors.Op("broadcast_publish") - err := r.svc.Publish(msg) - if err != nil { - *ok = false - return errors.E(op, err) - } - *ok = true - return nil -} - -func (r *rpc) PublishAsync() { - -} diff --git a/plugins/broadcast/ws/commands/join.go b/plugins/broadcast/ws/commands/join.go deleted file mode 100644 index 25943f0a..00000000 --- a/plugins/broadcast/ws/commands/join.go +++ /dev/null @@ -1,10 +0,0 @@ -package commands - -// Join command to save the connection -type Join struct { - Command string `mapstructure:"command"` -} - -func JoinCommand() { - -} diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go deleted file mode 100644 index cdff10da..00000000 --- a/plugins/broadcast/ws/commands/leave.go +++ /dev/null @@ -1 +0,0 @@ -package commands diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go deleted file mode 100644 index cdff10da..00000000 --- a/plugins/broadcast/ws/commands/subscribe.go +++ /dev/null @@ -1 +0,0 @@ -package commands diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go deleted file mode 100644 index 1d4132b4..00000000 --- a/plugins/broadcast/ws/config.go +++ /dev/null @@ -1,26 +0,0 @@ -package ws - -/* -broadcast: - ws-us-region-1: - subscriber: ws - path: "/ws" - - driver: redis - address: - - 6379 - db: 0 -*/ - -// Config represents configuration for the ws plugin -type Config struct { - // http path for the websocket - Path string `mapstructure:"Path"` -} - -// InitDefault initialize default values for the ws config -func (c *Config) InitDefault() { - if c.Path == "" { - c.Path = "/ws" - } -} diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go deleted file mode 100644 index f075864b..00000000 --- a/plugins/broadcast/ws/plugin.go +++ /dev/null @@ -1,59 +0,0 @@ -package ws - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - RootPluginName = "broadcast" - PluginName = "websockets" -) - -type Plugin struct { - // logger - log logger.Logger - // configurer plugin - cfg config.Configurer -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("ws_plugin_init") - - // check for the configuration section existence - if !cfg.Has(RootPluginName) { - return errors.E(op, errors.Disabled, errors.Str("broadcast plugin section should exists in the configuration")) - } - - p.cfg = cfg - p.log = log - - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -// Provides Provide a ws implementation -func (p *Plugin) Provides() []interface{} { - return []interface{}{ - p.Websocket, - } -} - -// Websocket method should provide the Subscriber implementation to the broadcast -func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) { - const op = errors.Op("websocket_subscriber_provide") - // initialize subscriber with the storage - ws, err := NewWSSubscriber(storage) - if err != nil { - return nil, errors.E(op, err) - } - - return ws, nil -} - -func (p *Plugin) Available() {} diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go deleted file mode 100644 index 660efdca..00000000 --- a/plugins/broadcast/ws/subscriber.go +++ /dev/null @@ -1,50 +0,0 @@ -package ws - -import ( - "github.com/gofiber/fiber/v2" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection" -) - -type Subscriber struct { - connections map[string]*connection.Connection - storage broadcast.Storage -} - -// config -// -func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) { - m := make(map[string]*connection.Connection) - - go func() { - app := fiber.New() - app.Use("/ws", wsMiddleware) - app.Listen(":8080") - }() - - return &Subscriber{ - connections: m, - storage: storage, - }, nil -} - -func (s *Subscriber) Subscribe(topics ...string) error { - panic("implement me") -} - -func (s *Subscriber) SubscribePattern(pattern string) error { - panic("implement me") -} - -func (s *Subscriber) Unsubscribe(topics ...string) error { - panic("implement me") -} - -func (s *Subscriber) UnsubscribePattern(pattern string) error { - panic("implement me") -} - -func (s *Subscriber) Publish(messages ...*broadcast.Message) error { - s.storage.GetConnection(messages[9].Topic) - return nil -} diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go deleted file mode 100644 index 068ef9fb..00000000 --- a/plugins/broadcast/ws/ws_middleware.go +++ /dev/null @@ -1,13 +0,0 @@ -package ws - -import ( - "github.com/gofiber/fiber/v2" - "github.com/gofiber/websocket/v2" -) - -func wsMiddleware(c *fiber.Ctx) error { - if websocket.IsWebSocketUpgrade(c) { - return c.Next() - } - return fiber.ErrUpgradeRequired -} diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go index d2183411..3694c5a7 100644 --- a/plugins/kv/drivers/redis/plugin.go +++ b/plugins/kv/drivers/redis/plugin.go @@ -28,7 +28,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { // Serve is noop here func (s *Plugin) Serve() chan error { - return make(chan error, 1) + return make(chan error) } func (s *Plugin) Stop() error { diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/memory/bst/bst.go index 7d09a10f..3060ff11 100644 --- a/plugins/broadcast/memory/bst/bst.go +++ b/plugins/memory/bst/bst.go @@ -17,8 +17,8 @@ func NewBST() Storage { } // Insert uuid to the topic -func (B *BST) Insert(uuid string, topic string) { - curr := B +func (b *BST) Insert(uuid string, topic string) { + curr := b for { if curr.topic == topic { @@ -50,29 +50,31 @@ func (B *BST) Insert(uuid string, topic string) { } } -func (B *BST) Get(topic string) map[string]struct{} { - curr := B +func (b *BST) Get(topic string) map[string]struct{} { + curr := b for curr != nil { if curr.topic == topic { return curr.uuids } if curr.topic < topic { curr = curr.left + continue } if curr.topic > topic { curr = curr.right + continue } } return nil } -func (B *BST) Remove(uuid string, topic string) { - B.removeHelper(uuid, topic, nil) +func (b *BST) Remove(uuid string, topic string) { + b.removeHelper(uuid, topic, nil) } -func (B *BST) removeHelper(uuid string, topic string, parent *BST) { - curr := B +func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit + curr := b for curr != nil { if topic < curr.topic { parent = curr @@ -126,9 +128,9 @@ func (B *BST) removeHelper(uuid string, topic string, parent *BST) { } //go:inline -func (B *BST) traverseForMinString() (string, map[string]struct{}) { - if B.left == nil { - return B.topic, B.uuids +func (b *BST) traverseForMinString() (string, map[string]struct{}) { + if b.left == nil { + return b.topic, b.uuids } - return B.left.traverseForMinString() + return b.left.traverseForMinString() } diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/memory/bst/bst_test.go index b5ad6c10..e8a13760 100644 --- a/plugins/broadcast/memory/bst/bst_test.go +++ b/plugins/memory/bst/bst_test.go @@ -8,6 +8,7 @@ import ( ) func TestNewBST(t *testing.T) { + // create a new bst g := NewBST() for i := 0; i < 100; i++ { @@ -22,12 +23,15 @@ func TestNewBST(t *testing.T) { g.Insert(uuid.NewString(), "comments3") } + // should be 100 exist := g.Get("comments") assert.Len(t, exist, 100) + // should be 100 exist2 := g.Get("comments2") assert.Len(t, exist2, 100) + // should be 100 exist3 := g.Get("comments3") assert.Len(t, exist3, 100) } diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/memory/bst/interface.go index ecf40414..ecf40414 100644 --- a/plugins/broadcast/memory/bst/interface.go +++ b/plugins/memory/bst/interface.go diff --git a/plugins/broadcast/memory/config.go b/plugins/memory/config.go index e80695bc..08dd9fc3 100644 --- a/plugins/broadcast/memory/config.go +++ b/plugins/memory/config.go @@ -1,6 +1,8 @@ package memory // Config for the memory driver is empty, it's just a placeholder -type Config struct{} +type Config struct { + Path string `mapstructure:"path"` +} func (c *Config) InitDefaults() {} diff --git a/plugins/memory/driver.go b/plugins/memory/driver.go new file mode 100644 index 00000000..5a96e773 --- /dev/null +++ b/plugins/memory/driver.go @@ -0,0 +1,28 @@ +package memory + +import ( + "github.com/spiral/roadrunner/v2/plugins/memory/bst" +) + +type Driver struct { + tree bst.Storage +} + +func NewInMemoryDriver() bst.Storage { + b := &Driver{ + tree: bst.NewBST(), + } + return b +} + +func (d *Driver) Insert(uuid string, topic string) { + d.tree.Insert(uuid, topic) +} + +func (d *Driver) Remove(uuid, topic string) { + d.tree.Remove(uuid, topic) +} + +func (d *Driver) Get(topic string) map[string]struct{} { + return d.tree.Get(topic) +} diff --git a/plugins/broadcast/memory/plugin.go b/plugins/memory/plugin.go index 2bd894a0..5efd5522 100644 --- a/plugins/broadcast/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -1,17 +1,14 @@ package memory import ( - "fmt" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) const ( - PluginName string = "broadcast" - SectionName string = "memory" + PluginName string = "memory" ) type Plugin struct { @@ -26,17 +23,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return errors.E(op, errors.Disabled) } - if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, errors.Disabled, err) - } - - p.cfg.InitDefaults() - p.log = log return nil } @@ -49,7 +35,6 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - return nil } @@ -58,10 +43,25 @@ func (p *Plugin) Available() {} // Name is endure.Named interface implementation func (p *Plugin) Name() string { - // broadcast.memory - return fmt.Sprintf("%s.%s", PluginName, SectionName) + return PluginName } -func (p *Plugin) Publish(msg []*broadcast.Message) error { - return nil +func (p *Plugin) Publish(messages []pubsub.Message) error { + panic("implement me") +} + +func (p *Plugin) PublishAsync(messages []pubsub.Message) { + panic("implement me") +} + +func (p *Plugin) Subscribe(topics ...string) error { + panic("implement me") +} + +func (p *Plugin) Unsubscribe(topics ...string) error { + panic("implement me") +} + +func (p *Plugin) Next() (pubsub.Message, error) { + panic("implement me") } diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go new file mode 100644 index 00000000..29016720 --- /dev/null +++ b/plugins/redis/fanin.go @@ -0,0 +1,100 @@ +package redis + +import ( + "context" + "sync" + + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/utils" +) + +type FanIn struct { + sync.Mutex + + client redis.UniversalClient + pubsub *redis.PubSub + + log logger.Logger + + // out channel with all subs + out chan pubsub.Message + + exit chan struct{} +} + +func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { + out := make(chan pubsub.Message, 100) + fi := &FanIn{ + out: out, + client: redisClient, + pubsub: redisClient.Subscribe(context.Background()), + exit: make(chan struct{}), + log: log, + } + + // start reading messages + go fi.read() + + return fi +} + +func (fi *FanIn) AddChannel(topics ...string) error { + const op = errors.Op("fanin_addchannel") + err := fi.pubsub.Subscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +// read reads messages from the pubsub subscription +func (fi *FanIn) read() { + for { + select { + //here we receive message from us (which we sent before in Publish) + //it should be compatible with the websockets.Msg interface + //payload should be in the redis.message.payload field + + case msg, ok := <-fi.pubsub.Channel(): + // channel closed + if !ok { + return + } + m := &pubsub.Msg{} + err := json.Unmarshal(utils.AsBytes(msg.Payload), m) + if err != nil { + fi.log.Error("failed to unmarshal payload", "error", err.Error()) + continue + } + + fi.out <- m + case <-fi.exit: + return + } + } +} + +func (fi *FanIn) RemoveChannel(topics ...string) error { + const op = errors.Op("fanin_remove") + err := fi.pubsub.Unsubscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (fi *FanIn) Stop() error { + fi.exit <- struct{}{} + close(fi.out) + close(fi.exit) + return nil +} + +func (fi *FanIn) Consume() <-chan pubsub.Message { + return fi.out +} diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index f0011690..24ed1f92 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -1,8 +1,12 @@ package redis import ( + "context" + "sync" + "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -10,72 +14,133 @@ import ( const PluginName = "redis" type Plugin struct { + sync.Mutex // config for RR integration cfg *Config // logger log logger.Logger // redis universal client universalClient redis.UniversalClient + + fanin *FanIn } -func (s *Plugin) GetClient() redis.UniversalClient { - return s.universalClient +func (p *Plugin) GetClient() redis.UniversalClient { + return p.universalClient } -func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("redis_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } - s.cfg.InitDefaults() - s.log = log - - s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: s.cfg.Addrs, - DB: s.cfg.DB, - Username: s.cfg.Username, - Password: s.cfg.Password, - SentinelPassword: s.cfg.SentinelPassword, - MaxRetries: s.cfg.MaxRetries, - MinRetryBackoff: s.cfg.MaxRetryBackoff, - MaxRetryBackoff: s.cfg.MaxRetryBackoff, - DialTimeout: s.cfg.DialTimeout, - ReadTimeout: s.cfg.ReadTimeout, - WriteTimeout: s.cfg.WriteTimeout, - PoolSize: s.cfg.PoolSize, - MinIdleConns: s.cfg.MinIdleConns, - MaxConnAge: s.cfg.MaxConnAge, - PoolTimeout: s.cfg.PoolTimeout, - IdleTimeout: s.cfg.IdleTimeout, - IdleCheckFrequency: s.cfg.IdleCheckFreq, - ReadOnly: s.cfg.ReadOnly, - RouteByLatency: s.cfg.RouteByLatency, - RouteRandomly: s.cfg.RouteRandomly, - MasterName: s.cfg.MasterName, + p.cfg.InitDefaults() + p.log = log + + p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: p.cfg.Addrs, + DB: p.cfg.DB, + Username: p.cfg.Username, + Password: p.cfg.Password, + SentinelPassword: p.cfg.SentinelPassword, + MaxRetries: p.cfg.MaxRetries, + MinRetryBackoff: p.cfg.MaxRetryBackoff, + MaxRetryBackoff: p.cfg.MaxRetryBackoff, + DialTimeout: p.cfg.DialTimeout, + ReadTimeout: p.cfg.ReadTimeout, + WriteTimeout: p.cfg.WriteTimeout, + PoolSize: p.cfg.PoolSize, + MinIdleConns: p.cfg.MinIdleConns, + MaxConnAge: p.cfg.MaxConnAge, + PoolTimeout: p.cfg.PoolTimeout, + IdleTimeout: p.cfg.IdleTimeout, + IdleCheckFrequency: p.cfg.IdleCheckFreq, + ReadOnly: p.cfg.ReadOnly, + RouteByLatency: p.cfg.RouteByLatency, + RouteRandomly: p.cfg.RouteRandomly, + MasterName: p.cfg.MasterName, }) + // init fanin + p.fanin = NewFanIn(p.universalClient, log) + return nil } -func (s *Plugin) Serve() chan error { - errCh := make(chan error, 1) +func (p *Plugin) Serve() chan error { + errCh := make(chan error) return errCh } -func (s Plugin) Stop() error { - return s.universalClient.Close() +func (p *Plugin) Stop() error { + const op = errors.Op("redis_plugin_stop") + err := p.fanin.Stop() + if err != nil { + return errors.E(op, err) + } + + err = p.universalClient.Close() + if err != nil { + return errors.E(op, err) + } + + return nil } -func (s *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // Available interface implementation -func (s *Plugin) Available() {} +func (p *Plugin) Available() {} + +func (p *Plugin) Publish(msg []pubsub.Message) error { + p.Lock() + defer p.Unlock() + + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics()); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i]) + if f.Err() != nil { + return f.Err() + } + } + } + return nil +} + +func (p *Plugin) PublishAsync(msg []pubsub.Message) { + go func() { + p.Lock() + defer p.Unlock() + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics()); j++ { + f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i]) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error()) + continue + } + } + } + }() +} + +func (p *Plugin) Subscribe(topics ...string) error { + return p.fanin.AddChannel(topics...) +} + +func (p *Plugin) Unsubscribe(topics ...string) error { + return p.fanin.RemoveChannel(topics...) +} + +// Next return next message +func (p *Plugin) Next() (pubsub.Message, error) { + return <-p.fanin.Consume(), nil +} diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go new file mode 100644 index 00000000..18c63be3 --- /dev/null +++ b/plugins/websockets/commands/enums.go @@ -0,0 +1,9 @@ +package commands + +type Command string + +const ( + Leave string = "leave" + Join string = "join" + Headers string = "headers" +) diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go new file mode 100644 index 00000000..f3cb8e12 --- /dev/null +++ b/plugins/websockets/config.go @@ -0,0 +1,67 @@ +package websockets + +import "time" + +/* +websockets: + # pubsubs should implement PubSub interface to be collected via endure.Collects + # also, they should implement RPC methods to publish data into them + # pubsubs might use general config section or its own + + pubsubs:["redis", "amqp", "memory"] + + # sample of the own config section for the redis pubsub driver + redis: + address: + - localhost:1111 + .... the rest + + + # path used as websockets path + path: "/ws" +*/ + +// Config represents configuration for the ws plugin +type Config struct { + // http path for the websocket + Path string `mapstructure:"path"` + // ["redis", "amqp", "memory"] + PubSubs []string `mapstructure:"pubsubs"` + Middleware []string `mapstructure:"middleware"` + Redis *RedisConfig `mapstructure:"redis"` +} + +type RedisConfig struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + +// InitDefault initialize default values for the ws config +func (c *Config) InitDefault() { + if c.Path == "" { + c.Path = "/ws" + } + if len(c.PubSubs) == 0 { + // memory used by default + c.PubSubs = append(c.PubSubs, "memory") + } +} diff --git a/plugins/broadcast/ws/connection/connection.go b/plugins/websockets/connection/connection.go index cfb47e35..5eb30c61 100644 --- a/plugins/broadcast/ws/connection/connection.go +++ b/plugins/websockets/connection/connection.go @@ -3,7 +3,7 @@ package connection import ( "sync" - "github.com/gofiber/websocket/v2" + "github.com/fasthttp/websocket" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -43,8 +43,8 @@ func (c *Connection) Write(mt int, data []byte) error { } func (c *Connection) Read() (int, []byte, error) { - c.RLock() - defer c.RUnlock() + //c.RLock() + //defer c.RUnlock() const op = errors.Op("websocket_read") mt, data, err := c.conn.ReadMessage() diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio new file mode 100644 index 00000000..748fec45 --- /dev/null +++ b/plugins/websockets/doc/broadcast.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-05-23T20:08:57.443Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="PhXRtBI6dFZzw_439Bi0" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">5VpZc+I4EP41VGUfoISNDx45ctUkm4NkZzIvU8KWjTbCYmWRwPz6lWzZ2JZhIByZrWWmiNU6aH19qLuthjmYLi4ZnE1uqY9IwwD+omEOG4bRtk1D/JGUZUrpdtyUEDLsq0Erwgj/RIoIFHWOfRSXBnJKCcezMtGjUYQ8XqJBxuh7eVhASflXZzBEGmHkQaJTv2KfTxS1a4BVxxXC4ST7aQOoninMRitCPIE+fS+QzPOGOWCU8vRpuhggItHLgEnnXazpzTljKOLbTFh2bm+eXvow8p5M8vrAMIh6za7ijS+zHSNfAKCalPEJDWkEyfmK2l9RbyidiWFtQfwbcb5U8oNzTgVpwqdE9aIF5t8Kzy/iGbQs1RpKdQFZY5k1Is6W34qNwizZXE1LWtk8HRcFVUznzEMbwMgUDLIQ8U3jlEQlVIVfULBfIjpFgiExgCECOX4r6xJUKhnm4/Kp9xQLng2gzMfOVEcZT6cDykuknKpZK9GLhwIbK1KiEDsoh2L4DZK52kL/8a43HPRGT4J89SygBmf3N8+X13/+oenR+wRzNJrBBPF34R3KGhFgQgaUUJaMNn2I3MAT9Jgz+ooKPbbnonEgZ9CIF+gg+WyS9xtiHC02Ckj1dswK0JmE3ws2nwljUjB3G6yXaUkau0JvaNB/He0JcWDJf7UQJx8d4vRzGIhzj64gNrpbQlzV+YNBbGoQP54Pr/dGOQgMr1aRfXtsWzUoH1KRqyh3nM9GuaOhfP1n8/b89u7xZW+kkb0Gaac7ThA9IdKW5X4y0q6GtNg6EUFRLFGej5szMg9xFGuwix3zMtKQ4DASz57ABwnw+hIXLKKinuqYYt9PYgGGYvwTjpOlJLIzeRYlO7P6DWso1xLBQJzGBe3jeGvT3lLJjQNAb3Z+DIMp/9JedBcE/ugPbl7+arYtDXvztIFVFj2lIZKzZWTVLkdWzhFDK7BtbLVlaKV0AbTaHaNbUodMGB8NvrIhNAhidJy4CuhHz71YCvSf9z1+UNu3kFPnFLu2Y8IjHz9O9xPjqMHwhUy/O/5iPut98f2bhyGPm/aJzDDPVVbpyUvJhOoNavcoeCsTrIVi2+zG2NICt7avvQRYd6qlqb6sBMivCRLfl3eS5cifKSM+Cxid5r0ejQIc6jmK3OMNHCNyvONPl9cmPdVMLa9sqB9pFGsHdSbYBC3bdsoeMRPpnulo0zRbZnnhtlle5CA+c5P2FvRAkyaMZ2kJKMAL5JdFWhOXVIWMp0ktKPGPytDbxoo+xNNQcE7wWHzDn3OG5CZDFCEGBfcXfVlsQqwVv4WHcaa2ZbSscqCje1PXycYU3alztEBHR/24YU1eI8q96Rb1orIPbv/CB3/co5rbxjQHrxftJURDDz/qxXpy17hR5/b3jaBlmFbZpDK/sqdrrESgRsvunsox6mWM++f+zfXoatdI8gAuywV2yygDbNl6XlzvtMyjOS397Ghr4PyP3Zilu7FNmrZ9amZUTA18xNZ6jMFlYYDyI2tN0bXKOYhdfS2y23jxkHJwzCxxk1iKhn11/wlGXU3qbGPLStchDLoWma6GzAi+oTzaf0fjmHqvUirZCYFppCH3Xyt7WZW3QfmbxqIc3JOWvTQ59B/vvpw/NswL+d8AzYZhw6lUyZAnIIDRfBx7DI8FvKfXZNsqpy/tGgRd54Qnk67IAjAioZqVsLH/mcu3xP2pcNFYKGJP9ILZQnynqKb0JpfHkuzrFPqkmjeVZss+pdylbh95lMHESpIx88hHjOAIrX5aPCkZpgyOM8KIs7nHRTqU9QgkxtXRgjar0iasSvnQpgkKeNrpys4qv090hr1kzjBZneEo3MTVMXgYZR6owgU4ywsTCtQcknu4JBT6+RQG3/VBJ99HwXgVX0Vfm9Vc8k2Bswlkvkd95NdssZ77ilOoeuBExXNHnfLbp2JUQJK7FbKgVpPDy/YFnGIiLf8KkTckVz24g+lWYgnLdFtZiaTgY8y609Ltyvr7kRy1nvHpAXDk9+SFGdGKaITW1TcbO93F+Mi9j48HsjX5+Ca3+8tAtiAyq0Zi1ofiWT13rLxJtIFbXiLdt3bLQ1/IrmifVVnocNdF6rVMT0V7tw87h6y/9Wte+5SveetRdtYGDXNS9a0EZ5TBBHmvibKGOBYBAJJnC5eHY1xwxKvx+gqPyMesUHj3mdB5tnGyIBZ52k0PKk7+EBG0Wa5NOHW3I5ya0sQhhHn5BoIF+xE9jB5vX69h8+rqu1sTQYuQGVwmx2mcRQ3Vg7MwZH00vT67UXj+VrmNXb234tZIBhwmxxTN1aXL1Pet7q6a5/8C</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/doc/broadcast_arch.drawio b/plugins/websockets/doc/broadcast_arch.drawio new file mode 100644 index 00000000..54bba9c7 --- /dev/null +++ b/plugins/websockets/doc/broadcast_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-05-24T11:49:56.412Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="onoEtIi9b01eD9L0oL7y" version="14.5.1" type="device"><diagram id="uVz6nu4iiJ3Jh2FwnUZb" name="Page-1">zZSxboMwEEC/hrES4JAma9I0WSpVYkCdKgdfsVXDIWMK6dfXNEcAoUjtUKUT9rvz2X4ceGybt3vDS/mEArQX+qL12IMXhsE6WrlHR05EAnZ/JplRgtgAYvUJBH2itRJQTRItoraqnMIUiwJSO2HcGGymaW+op7uWPIMZiFOu5zRRwsr+YqE/BA6gMtlvHfoUyXmfTaCSXGAzQmznsa1BtOdR3m5Bd/p6Med1j1eil5MZKOxPFpS71cthFbVJvE+Xr4k0+IZ3VOWD65pu/FwftaokmK64rjNV0PHtqZdisC4EdGV9j20aqSzEJU+7aOP6wDFpc+1mgRvSBmAstFdPHlx8uFYCzMGak0uhBeGCFFIXsTXNm9Er6TXL0dtgxDh1QXYpPXhyA1L1C23hTNvGIBcpr+y/sxUFt7bFZrYSOFaYvsPtbS2WU1uL6O9suenwuX/HRr9NtvsC</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/doc/ws.drawio b/plugins/websockets/doc/ws.drawio index 739b797a..739b797a 100644 --- a/plugins/broadcast/doc/ws.drawio +++ b/plugins/websockets/doc/ws.drawio diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go new file mode 100644 index 00000000..391c9a8c --- /dev/null +++ b/plugins/websockets/executor/executor.go @@ -0,0 +1,146 @@ +package executor + +import ( + "github.com/fasthttp/websocket" + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/commands" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" +) + +type Response struct { + Topic string `json:"topic"` + Payload []string `json:"payload"` +} + +type Executor struct { + conn *connection.Connection + storage *storage.Storage + log logger.Logger + + // associated connection ID + connID string + pubsub pubsub.PubSub +} + +// NewExecutor creates protected connection and starts command loop +func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor { + return &Executor{ + conn: conn, + connID: connID, + storage: bst, + log: log, + pubsub: pubsubs, + } +} + +func (e *Executor) StartCommandLoop() error { + for { + mt, data, err := e.conn.Read() + if err != nil { + if mt == -1 { + return err + } + + return err + } + + msg := &pubsub.Msg{} + + err = json.Unmarshal(data, msg) + if err != nil { + e.log.Error("error unmarshal message", "error", err) + continue + } + + switch msg.Command() { + // handle leave + case commands.Join: + // TODO access validators model update + //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...) + //// validation error + //if err != nil { + // e.log.Error("validation error", "error", err) + // + // resp := &Response{ + // Topic: "#join", + // Payload: msg.Topics(), + // } + // + // packet, err := json.Marshal(resp) + // if err != nil { + // e.log.Error("error marshal the body", "error", err) + // return err + // } + // + // err = e.conn.Write(websocket.BinaryMessage, packet) + // if err != nil { + // e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + // continue + // } + // + // continue + //} + // associate connection with topics + e.storage.Store(e.connID, msg.Topics()) + + resp := &Response{ + Topic: "@join", + Payload: msg.Topics(), + } + + packet, err := json.Marshal(resp) + if err != nil { + e.log.Error("error marshal the body", "error", err) + continue + } + + err = e.conn.Write(websocket.BinaryMessage, packet) + if err != nil { + e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + continue + } + + err = e.pubsub.Subscribe(msg.Topics()...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) + continue + } + + // handle leave + case commands.Leave: + // remove associated connections from the storage + e.storage.Remove(e.connID, msg.Topics()) + + resp := &Response{ + Topic: "@leave", + Payload: msg.Topics(), + } + + packet, err := json.Marshal(resp) + if err != nil { + e.log.Error("error marshal the body", "error", err) + continue + } + + err = e.pubsub.Unsubscribe(msg.Topics()...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) + continue + } + + err = e.conn.Write(websocket.BinaryMessage, packet) + if err != nil { + e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + continue + } + + case commands.Headers: + + default: + e.log.Warn("unknown command", "command", msg.Command()) + } + } +} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go new file mode 100644 index 00000000..a247da69 --- /dev/null +++ b/plugins/websockets/plugin.go @@ -0,0 +1,206 @@ +package websockets + +import ( + "net/http" + "sync" + "time" + + "github.com/fasthttp/websocket" + "github.com/google/uuid" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/executor" + "github.com/spiral/roadrunner/v2/plugins/websockets/pool" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" +) + +const ( + PluginName string = "websockets" +) + +type Plugin struct { + sync.RWMutex + // Collection with all available pubsubs + pubsubs map[string]pubsub.PubSub + + Config *Config + log logger.Logger + + // global connections map + connections sync.Map + storage *storage.Storage + + workersPool *pool.WorkersPool +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("websockets_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.Config) + if err != nil { + return errors.E(op, err) + } + + p.pubsubs = make(map[string]pubsub.PubSub) + p.log = log + p.storage = storage.NewStorage() + p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log) + + return nil +} + +func (p *Plugin) Serve() chan error { + errCh := make(chan error) + go func() { + ps := p.pubsubs["redis"] + + for { + // get message + // get topic + // get connection uuid from the storage by the topic + // write payload into the connection + // do that in the workers pool + data, err := ps.Next() + if err != nil { + errCh <- err + return + } + + if data == nil { + continue + } + + p.workersPool.Queue(data) + } + }() + return errCh +} + +func (p *Plugin) Stop() error { + p.workersPool.Stop() + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.GetPublishers, + } +} + +func (p *Plugin) Available() {} + +func (p *Plugin) RPC() interface{} { + return &rpc{ + plugin: p, + log: p.log, + } +} + +func (p *Plugin) Name() string { + return PluginName +} + +// GetPublishers collects all pubsubs +func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) { + p.pubsubs[name.Name()] = pub +} + +func (p *Plugin) Middleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != p.Config.Path { + next.ServeHTTP(w, r) + return + } + + // connection upgrader + upgraded := websocket.Upgrader{ + HandshakeTimeout: time.Second * 60, + ReadBufferSize: 0, + WriteBufferSize: 0, + WriteBufferPool: nil, + Subprotocols: nil, + Error: nil, + CheckOrigin: nil, + EnableCompression: false, + } + + // upgrade connection to websocket connection + _conn, err := upgraded.Upgrade(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + safeConn := connection.NewConnection(_conn, p.log) + defer func() { + err = safeConn.Close() + if err != nil { + p.log.Error("connection close error", "error", err) + } + }() + + // generate UUID from the connection + connectionID := uuid.NewString() + // store connection + p.connections.Store(connectionID, safeConn) + // when exiting - delete the connection + defer func() { + p.connections.Delete(connectionID) + }() + + // Executor wraps a connection to have a safe abstraction + p.Lock() + e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"]) + p.Unlock() + + p.log.Info("websocket client connected", "uuid", connectionID) + + err = e.StartCommandLoop() + if err != nil { + p.log.Error("command loop error", "error", err.Error()) + return + } + }) +} + +func (p *Plugin) Publish(msg []pubsub.Message) error { + p.Lock() + defer p.Unlock() + + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics()); j++ { + if br, ok := p.pubsubs[msg[i].Broker()]; ok { + err := br.Publish(msg) + if err != nil { + return errors.E(err) + } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker()) + } + } + } + return nil +} + +func (p *Plugin) PublishAsync(msg []pubsub.Message) { + go func() { + p.Lock() + defer p.Unlock() + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics()); j++ { + err := p.pubsubs[msg[i].Broker()].Publish(msg) + if err != nil { + p.log.Error("publish async error", "error", err) + return + } + + } + } + }() +} diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go new file mode 100644 index 00000000..ee31d62f --- /dev/null +++ b/plugins/websockets/pool/workers_pool.go @@ -0,0 +1,104 @@ +package pool + +import ( + "sync" + + "github.com/fasthttp/websocket" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" +) + +type WorkersPool struct { + storage *storage.Storage + connections *sync.Map + resPool sync.Pool + log logger.Logger + + queue chan pubsub.Message + exit chan struct{} +} + +func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { + wp := &WorkersPool{ + connections: connections, + queue: make(chan pubsub.Message, 100), + storage: storage, + log: log, + exit: make(chan struct{}), + } + + wp.resPool.New = func() interface{} { + return make(map[string]struct{}, 10) + } + + for i := 0; i < 10; i++ { + wp.do() + } + + return wp +} + +func (wp *WorkersPool) Queue(msg pubsub.Message) { + wp.queue <- msg +} + +func (wp *WorkersPool) Stop() { + for i := 0; i < 10; i++ { + wp.exit <- struct{}{} + } + + close(wp.exit) +} + +func (wp *WorkersPool) put(res map[string]struct{}) { + // optimized + // https://go-review.googlesource.com/c/go/+/110055/ + // not O(n), but O(1) + for k := range res { + delete(res, k) + } +} + +func (wp *WorkersPool) get() map[string]struct{} { + return wp.resPool.Get().(map[string]struct{}) +} + +func (wp *WorkersPool) do() { + go func() { + for { + select { + case msg := <-wp.queue: + res := wp.get() + // get connections for the particular topic + wp.storage.Get(msg.Topics(), res) + if len(res) == 0 { + wp.log.Info("no such topic", "topic", msg.Topics()) + wp.put(res) + continue + } + + for i := range res { + c, ok := wp.connections.Load(i) + if !ok { + panic("not ok here (((") + } + + conn := c.(*connection.Connection) + err := conn.Write(websocket.BinaryMessage, msg.Payload()) + if err != nil { + // TODO handle error + wp.put(res) + continue + } + } + + wp.put(res) + case <-wp.exit: + wp.log.Info("get exit signal, exiting from the workers pool") + return + } + } + }() +} diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go new file mode 100644 index 00000000..0f0303b7 --- /dev/null +++ b/plugins/websockets/rpc.go @@ -0,0 +1,46 @@ +package websockets + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type rpc struct { + plugin *Plugin + log logger.Logger +} + +func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error { + const op = errors.Op("broadcast_publish") + + // publish to the registered broker + mi := make([]pubsub.Message, 0, len(msg)) + // golang can't convert slice in-place + // so, we need to convert it manually + for i := 0; i < len(msg); i++ { + mi = append(mi, msg[i]) + } + err := r.plugin.Publish(mi) + if err != nil { + *ok = false + return errors.E(op, err) + } + *ok = true + return nil +} + +func (r *rpc) PublishAsync(msg []*pubsub.Msg, ok *bool) error { + // publish to the registered broker + mi := make([]pubsub.Message, 0, len(msg)) + // golang can't convert slice in-place + // so, we need to convert it manually + for i := 0; i < len(msg); i++ { + mi = append(mi, msg[i]) + } + + r.plugin.PublishAsync(mi) + + *ok = true + return nil +} diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go new file mode 100644 index 00000000..a7e49207 --- /dev/null +++ b/plugins/websockets/storage/storage.go @@ -0,0 +1,50 @@ +package storage + +import ( + "sync" + + "github.com/spiral/roadrunner/v2/plugins/memory/bst" +) + +type Storage struct { + sync.RWMutex + BST bst.Storage +} + +func NewStorage() *Storage { + return &Storage{ + BST: bst.NewBST(), + } +} + +func (s *Storage) Store(connID string, topics []string) { + s.Lock() + defer s.Unlock() + + for i := 0; i < len(topics); i++ { + s.BST.Insert(connID, topics[i]) + } +} + +func (s *Storage) Remove(connID string, topics []string) { + s.Lock() + defer s.Unlock() + + for i := 0; i < len(topics); i++ { + s.BST.Remove(connID, topics[i]) + } +} + +func (s *Storage) Get(topics []string, res map[string]struct{}) { + s.RLock() + defer s.RUnlock() + + for i := 0; i < len(topics); i++ { + d := s.BST.Get(topics[i]) + if len(d) > 0 { + for ii := range d { + res[ii] = struct{}{} + } + } + } +} diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go new file mode 100644 index 00000000..9d9522d4 --- /dev/null +++ b/plugins/websockets/validator/access_validator.go @@ -0,0 +1,102 @@ +package validator + +import ( + "bytes" + "io" + "net/http" + "strings" + + "github.com/spiral/roadrunner/v2/plugins/http/attributes" +) + +type AccessValidator struct { + buffer *bytes.Buffer + header http.Header + status int +} + +func NewValidator() *AccessValidator { + return &AccessValidator{ + buffer: bytes.NewBuffer(nil), + header: make(http.Header), + } +} + +// Copy all content to parent response writer. +func (w *AccessValidator) Copy(rw http.ResponseWriter) { + rw.WriteHeader(w.status) + + for k, v := range w.header { + for _, vv := range v { + rw.Header().Add(k, vv) + } + } + + _, _ = io.Copy(rw, w.buffer) +} + +// Header returns the header map that will be sent by WriteHeader. +func (w *AccessValidator) Header() http.Header { + return w.header +} + +// Write writes the data to the connection as part of an HTTP reply. +func (w *AccessValidator) Write(p []byte) (int, error) { + return w.buffer.Write(p) +} + +// WriteHeader sends an HTTP response header with the provided status code. +func (w *AccessValidator) WriteHeader(statusCode int) { + w.status = statusCode +} + +// IsOK returns true if response contained 200 status code. +func (w *AccessValidator) IsOK() bool { + return w.status == 200 +} + +// Body returns response body to rely to user. +func (w *AccessValidator) Body() []byte { + return w.buffer.Bytes() +} + +// Error contains server response. +func (w *AccessValidator) Error() string { + return w.buffer.String() +} + +// AssertServerAccess checks if user can join server and returns error and body if user can not. Must return nil in +// case of error +func (w *AccessValidator) AssertServerAccess(f http.HandlerFunc, r *http.Request) error { + if err := attributes.Set(r, "ws:joinServer", true); err != nil { + return err + } + + defer delete(attributes.All(r), "ws:joinServer") + + f(w, r) + + if !w.IsOK() { + return w + } + + return nil +} + +// AssertTopicsAccess checks if user can access given upstream, the application will receive all user headers and cookies. +// the decision to authorize user will be based on response code (200). +func (w *AccessValidator) AssertTopicsAccess(f http.HandlerFunc, r *http.Request, channels ...string) error { + if err := attributes.Set(r, "ws:joinTopics", strings.Join(channels, ",")); err != nil { + return err + } + + defer delete(attributes.All(r), "ws:joinTopics") + + f(w, r) + + if !w.IsOK() { + return w + } + + return nil +} diff --git a/plugins/websockets/validator/access_validator_test.go b/plugins/websockets/validator/access_validator_test.go new file mode 100644 index 00000000..4a07b00f --- /dev/null +++ b/plugins/websockets/validator/access_validator_test.go @@ -0,0 +1,35 @@ +package validator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResponseWrapper_Body(t *testing.T) { + w := NewValidator() + _, _ = w.Write([]byte("hello")) + + assert.Equal(t, []byte("hello"), w.Body()) +} + +func TestResponseWrapper_Header(t *testing.T) { + w := NewValidator() + w.Header().Set("k", "value") + + assert.Equal(t, "value", w.Header().Get("k")) +} + +func TestResponseWrapper_StatusCode(t *testing.T) { + w := NewValidator() + w.WriteHeader(200) + + assert.True(t, w.IsOK()) +} + +func TestResponseWrapper_StatusCodeBad(t *testing.T) { + w := NewValidator() + w.WriteHeader(400) + + assert.False(t, w.IsOK()) +} diff --git a/tests/Dockerfile b/tests/Dockerfile new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/Dockerfile diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 67d5476b..a8e8a7c5 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -7,5 +7,8 @@ services: - "0.0.0.0:11211:11211" redis: image: redis:6 + mem_limit: 16384m + mem_reservation: 2048M + cpus: 8 ports: - "6379:6379" diff --git a/tests/plugins/informer/.rr-informer.yaml b/tests/plugins/informer/.rr-informer.yaml index e1edbb44..94c9a856 100644 --- a/tests/plugins/informer/.rr-informer.yaml +++ b/tests/plugins/informer/.rr-informer.yaml @@ -3,8 +3,8 @@ server: user: "" group: "" env: - "RR_CONFIG": "/some/place/on/the/C134" - "RR_CONFIG2": "C138" + - RR_CONFIG: "/some/place/on/the/C134" + - RR_CONFIG: "C138" relay: "pipes" relay_timeout: "20s" @@ -12,4 +12,4 @@ rpc: listen: tcp://127.0.0.1:6001 logs: mode: development - level: error
\ No newline at end of file + level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml new file mode 100644 index 00000000..9973b2dc --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -0,0 +1,50 @@ +rpc: + listen: tcp://127.0.0.1:6001 + + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:15395 + max_request_size: 1024 + middleware: ["websockets"] + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +websockets: + # pubsubs should implement PubSub interface to be collected via endure.Collects + # also, they should implement RPC methods to publish data into them + # pubsubs might use general config section or its own + + pubsubs: [ "redis" ] + + # sample of the own config section for the redis pubsub driver + + redis: + addrs: + - localhost:1111 + # path used as websockets path + path: "/ws" +logs: + mode: development + level: debug + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go new file mode 100644 index 00000000..a9b90fd0 --- /dev/null +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -0,0 +1,102 @@ +package websockets + +import ( + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/config" + httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/redis" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/websockets" + "github.com/stretchr/testify/assert" +) + +func TestBroadcastInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.InfoLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1000) + t.Run("test1", test1) + t.Run("test2", test2) + + stopCh <- struct{}{} + + wg.Wait() +} + +func test1(t *testing.T) { + +} + +func test2(t *testing.T) { + +} |