diff options
Diffstat (limited to 'plugins')
30 files changed, 480 insertions, 893 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go new file mode 100644 index 00000000..92f7951d --- /dev/null +++ b/plugins/broadcast/config.go @@ -0,0 +1,12 @@ +package broadcast + +type Config struct { +} + +func (c *Config) InitDefaults() { + +} + +func (c *Config) Valid() error { + return nil +} diff --git a/plugins/broadcast/doc/.rr-broadcast.yaml b/plugins/broadcast/doc/.rr-broadcast.yaml new file mode 100644 index 00000000..8b0eef20 --- /dev/null +++ b/plugins/broadcast/doc/.rr-broadcast.yaml @@ -0,0 +1,29 @@ +broadcast: + # path to enable web-socket handler middleware + path: /ws + + # optional, redis broker configuration + redis: + addrs: + - "localhost:6379" + # if a MasterName is passed a sentinel-backed FailoverClient will be returned + master_name: "" + username: "" + password: "" + db: 0 + sentinel_password: "" + route_by_latency: false + route_randomly: false + dial_timeout: 0 # accepted values [1s, 5m, 3h] + max_retries: 1 + min_retry_backoff: 0 # accepted values [1s, 5m, 3h] + max_retry_backoff: 0 # accepted values [1s, 5m, 3h] + pool_size: 0 + min_idle_conns: 0 + max_conn_age: 0 # accepted values [1s, 5m, 3h] + read_timeout: 0 # accepted values [1s, 5m, 3h] + write_timeout: 0 # accepted values [1s, 5m, 3h] + pool_timeout: 0 # accepted values [1s, 5m, 3h] + idle_timeout: 0 # accepted values [1s, 5m, 3h] + idle_check_freq: 0 # accepted values [1s, 5m, 3h] + read_only: false diff --git a/plugins/broadcast/doc/broadcast.drawio b/plugins/broadcast/doc/broadcast.drawio new file mode 100644 index 00000000..2339f5b1 --- /dev/null +++ b/plugins/broadcast/doc/broadcast.drawio @@ -0,0 +1,151 @@ +<mxfile> + <diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1"> + <mxGraphModel dx="1582" dy="1094" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="1920" pageHeight="1200" math="0" shadow="0"> + <root> + <mxCell id="0"/> + <mxCell id="1" parent="0"/> + <mxCell id="y4MLTYBancT3lkQri0nA-9" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" edge="1"> + <mxGeometry relative="1" as="geometry"> + <mxPoint x="620" y="440" as="targetPoint"/> + </mxGeometry> + </mxCell> + <mxCell id="CDYlmZ7dxupAKddLQDts-15" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="y4MLTYBancT3lkQri0nA-2" edge="1"> + <mxGeometry relative="1" as="geometry"> + <Array as="points"> + <mxPoint x="460" y="240"/> + <mxPoint x="270" y="240"/> + </Array> + </mxGeometry> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-16" value="4" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="34_DfmtK1x9xla_BCLYV-17" edge="1"> + <mxGeometry x="0.0526" y="10" relative="1" as="geometry"> + <mxPoint x="459.72413793103465" y="510" as="targetPoint"/> + <mxPoint x="-10" y="10" as="offset"/> + </mxGeometry> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-1" value="BROADCAST HUB (PLUGIN)" style="whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontColor=#000000;" parent="1" vertex="1"> + <mxGeometry x="430" y="410" width="120" height="60" as="geometry"/> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-5" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="y4MLTYBancT3lkQri0nA-1" edge="1"> + <mxGeometry relative="1" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-19" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="34_DfmtK1x9xla_BCLYV-1" edge="1"> + <mxGeometry relative="1" as="geometry"> + <Array as="points"> + <mxPoint x="240" y="160"/> + <mxPoint x="696" y="160"/> + </Array> + </mxGeometry> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-22" value="6" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-19" vertex="1" connectable="0"> + <mxGeometry x="-0.2172" relative="1" as="geometry"> + <mxPoint x="108.62" y="-10" as="offset"/> + </mxGeometry> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-2" value="WS" style="whiteSpace=wrap;html=1;fillColor=#f5f5f5;strokeColor=#666666;fontColor=#333333;" parent="1" vertex="1"> + <mxGeometry x="210" y="290" width="120" height="40" as="geometry"/> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" target="y4MLTYBancT3lkQri0nA-1" edge="1"> + <mxGeometry relative="1" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-23" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" edge="1"> + <mxGeometry relative="1" as="geometry"> + <mxPoint x="50" y="440.2068965517242" as="targetPoint"/> + </mxGeometry> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-3" value="REDIS" style="whiteSpace=wrap;html=1;fillColor=#fff2cc;strokeColor=#d6b656;fontColor=#000000;" parent="1" vertex="1"> + <mxGeometry x="210" y="420" width="120" height="40" as="geometry"/> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-7" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-4" target="y4MLTYBancT3lkQri0nA-1" edge="1"> + <mxGeometry relative="1" as="geometry"/> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-4" value="IN-MEMORY" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1"> + <mxGeometry x="210" y="560" width="120" height="40" as="geometry"/> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-8" value="Collects sub-plugins" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1"> + <mxGeometry x="310" y="370" width="120" height="20" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-15" value="3" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.75;exitDx=0;exitDy=0;entryX=1;entryY=0.75;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-10" target="y4MLTYBancT3lkQri0nA-1" edge="1"> + <mxGeometry x="0.1429" y="15" relative="1" as="geometry"> + <mxPoint as="offset"/> + </mxGeometry> + </mxCell> + <mxCell id="y4MLTYBancT3lkQri0nA-10" value="RPC BUS" style="whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=#9673a6;fontColor=#000000;" parent="1" vertex="1"> + <mxGeometry x="620" y="410" width="120" height="60" as="geometry"/> + </mxCell> + <mxCell id="CDYlmZ7dxupAKddLQDts-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;entryX=0.75;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="CDYlmZ7dxupAKddLQDts-1" target="y4MLTYBancT3lkQri0nA-1" edge="1"> + <mxGeometry relative="1" as="geometry"/> + </mxCell> + <mxCell id="CDYlmZ7dxupAKddLQDts-8" value="Connect to the GO endpoint (from the config)" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="CDYlmZ7dxupAKddLQDts-6" vertex="1" connectable="0"> + <mxGeometry x="-0.6679" y="-2" relative="1" as="geometry"> + <mxPoint x="-33.3" y="-13" as="offset"/> + </mxGeometry> + </mxCell> + <mxCell id="CDYlmZ7dxupAKddLQDts-1" value="" style="aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/azure2/general/Browser.svg;" parent="1" vertex="1"> + <mxGeometry x="652.5" y="30" width="87.5" height="70" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-12" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="y4MLTYBancT3lkQri0nA-10" edge="1"> + <mxGeometry relative="1" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-20" value="2" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-12" vertex="1" connectable="0"> + <mxGeometry x="0.2355" y="-1" relative="1" as="geometry"> + <mxPoint x="9" y="12.69" as="offset"/> + </mxGeometry> + </mxCell> + <mxCell id="CDYlmZ7dxupAKddLQDts-3" value="PUBLISH" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> + <mxGeometry x="636.25" y="568" width="87.5" height="30" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-11" value="1" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-5" target="CDYlmZ7dxupAKddLQDts-3" edge="1"> + <mxGeometry x="0.2" y="-10" relative="1" as="geometry"> + <Array as="points"> + <mxPoint x="680" y="620"/> + <mxPoint x="680" y="620"/> + </Array> + <mxPoint as="offset"/> + </mxGeometry> + </mxCell> + <mxCell id="CDYlmZ7dxupAKddLQDts-5" value="PHP" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> + <mxGeometry x="620" y="628" width="120" height="30" as="geometry"/> + </mxCell> + <mxCell id="CDYlmZ7dxupAKddLQDts-9" value="Save the websocket connection" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1"> + <mxGeometry x="280" y="220" width="180" height="20" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-1" value="BROKER?? -&gt; Subscriber" style="whiteSpace=wrap;html=1;" parent="1" vertex="1"> + <mxGeometry x="653" y="100" width="87" height="30" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-9" value="<p style="margin: 0px ; margin-top: 4px ; text-align: center ; text-decoration: underline"><b>Structure</b></p><hr><p style="margin: 0px ; margin-left: 8px">Topic = string<br>Payload = raw<br></p><p style="margin: 0px ; margin-left: 8px">Broker = ws (from config) (hardcoded)<br></p>" style="verticalAlign=top;align=left;overflow=fill;fontSize=12;fontFamily=Helvetica;html=1;" parent="1" vertex="1"> + <mxGeometry x="780" y="543" width="220" height="80" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-10" value="" style="endArrow=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="34_DfmtK1x9xla_BCLYV-9" edge="1"> + <mxGeometry width="50" height="50" relative="1" as="geometry"> + <mxPoint x="740" y="608" as="sourcePoint"/> + <mxPoint x="790" y="558" as="targetPoint"/> + </mxGeometry> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-14" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-13" target="y4MLTYBancT3lkQri0nA-1" edge="1"> + <mxGeometry relative="1" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-13" value="AMQP" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1"> + <mxGeometry x="210" y="658" width="120" height="40" as="geometry"/> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-18" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-17" target="y4MLTYBancT3lkQri0nA-2" edge="1"> + <mxGeometry relative="1" as="geometry"> + <Array as="points"> + <mxPoint x="160" y="520"/> + <mxPoint x="160" y="380"/> + <mxPoint x="270" y="380"/> + </Array> + </mxGeometry> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-21" value="5" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-18" vertex="1" connectable="0"> + <mxGeometry x="-0.317" y="3" relative="1" as="geometry"> + <mxPoint x="-7" y="-10.34" as="offset"/> + </mxGeometry> + </mxCell> + <mxCell id="34_DfmtK1x9xla_BCLYV-17" value="<ul><li>Check registered topics</li><li>Redirect to driver</li></ul>" style="whiteSpace=wrap;html=1;align=left;" parent="1" vertex="1"> + <mxGeometry x="240" y="500" width="175" height="40" as="geometry"/> + </mxCell> + </root> + </mxGraphModel> + </diagram> +</mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go new file mode 100644 index 00000000..716b3aac --- /dev/null +++ b/plugins/broadcast/interface.go @@ -0,0 +1,36 @@ +package broadcast + +import "encoding/json" + +// Subscriber defines the ability to operate as message passing broker. +type Subscriber interface { + // Subscribe broker to one or multiple topics. + Subscribe(upstream chan *Message, topics ...string) error + + // SubscribePattern broker to pattern. + SubscribePattern(upstream chan *Message, pattern string) error + + // Unsubscribe broker from one or multiple topics. + Unsubscribe(upstream chan *Message, topics ...string) error + + // UnsubscribePattern broker from pattern. + UnsubscribePattern(upstream chan *Message, pattern string) error +} + +type Storage interface { + Store() +} + +type Publisher interface { + // Publish one or multiple Channel. + Publish(messages ...*Message) error +} + +// Message represent single message. +type Message struct { + // Topic message been pushed into. + Topic string `json:"topic"` + + // Payload to be broadcasted. Must be valid json when transferred over RPC. + Payload json.RawMessage `json:"payload"` +} diff --git a/plugins/broadcast/memory/config.go b/plugins/broadcast/memory/config.go new file mode 100644 index 00000000..e80695bc --- /dev/null +++ b/plugins/broadcast/memory/config.go @@ -0,0 +1,6 @@ +package memory + +// Config for the memory driver is empty, it's just a placeholder +type Config struct{} + +func (c *Config) InitDefaults() {} diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go new file mode 100644 index 00000000..2eb45c8e --- /dev/null +++ b/plugins/broadcast/memory/driver.go @@ -0,0 +1,39 @@ +package memory + +import "github.com/spiral/roadrunner/v2/plugins/broadcast" + +type Driver struct { +} + +func NewInMemoryDriver() broadcast.Subscriber { + b := &Driver{} + return b +} + +func (d *Driver) Serve() error { + panic("implement me") +} + +func (d *Driver) Stop() { + panic("implement me") +} + +func (d *Driver) Subscribe(upstream chan *broadcast.Message, topics ...string) error { + panic("implement me") +} + +func (d *Driver) SubscribePattern(upstream chan *broadcast.Message, pattern string) error { + panic("implement me") +} + +func (d *Driver) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error { + panic("implement me") +} + +func (d *Driver) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error { + panic("implement me") +} + +func (d *Driver) Publish(messages ...*broadcast.Message) error { + panic("implement me") +} diff --git a/plugins/broadcast/memory/plugin.go b/plugins/broadcast/memory/plugin.go new file mode 100644 index 00000000..2bd894a0 --- /dev/null +++ b/plugins/broadcast/memory/plugin.go @@ -0,0 +1,67 @@ +package memory + +import ( + "fmt" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "broadcast" + SectionName string = "memory" +) + +type Plugin struct { + log logger.Logger + cfg *Config +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("memory_plugin_init") + + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, errors.Disabled, err) + } + + p.cfg.InitDefaults() + + p.log = log + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("memory_plugin_serve") + errCh := make(chan error) + + return errCh +} + +func (p *Plugin) Stop() error { + + return nil +} + +// Available interface implementation for the plugin +func (p *Plugin) Available() {} + +// Name is endure.Named interface implementation +func (p *Plugin) Name() string { + // broadcast.memory + return fmt.Sprintf("%s.%s", PluginName, SectionName) +} + +func (p *Plugin) Publish(msg []*broadcast.Message) error { + return nil +} diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go new file mode 100644 index 00000000..7ad9e2ae --- /dev/null +++ b/plugins/broadcast/plugin.go @@ -0,0 +1,95 @@ +package broadcast + +import ( + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "broadcast" +) + +type Plugin struct { + broker Subscriber + driver Storage + + log logger.Logger + cfg *Config +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("broadcast_plugin_init") + + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, errors.Disabled, err) + } + + p.cfg.InitDefaults() + + p.log = log + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("broadcast_plugin_serve") + errCh := make(chan error) + + // if there are no brokers, return nil + if p.broker == nil { + errCh <- errors.E(op, errors.Str("no broker detected")) + return errCh + } + + // start the underlying broker + go func() { + // err := p.broker.Serve() + // if err != nil { + // errCh <- errors.E(op, err) + // } + }() + + return errCh +} + +func (p *Plugin) Stop() error { + return nil +} + +// Available interface implementation for the plugin +func (p *Plugin) Available() {} + +// Name is endure.Named interface implementation +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectBroker, + } +} + +func (p *Plugin) CollectBroker(name endure.Named, broker Subscriber) { + p.broker = broker +} + +func (p *Plugin) RPC() interface{} { + // create an RPC service for the collects + r := &rpc{ + log: p.log, + svc: p, + } + return r +} + +func (p *Plugin) Publish(msg []*Message) error { + const op = errors.Op("broadcast_plugin_publish") + return nil +} diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go new file mode 100644 index 00000000..65a229e1 --- /dev/null +++ b/plugins/broadcast/redis/driver.go @@ -0,0 +1 @@ +package redis diff --git a/plugins/broadcast/redis/plugin.go b/plugins/broadcast/redis/plugin.go new file mode 100644 index 00000000..65a229e1 --- /dev/null +++ b/plugins/broadcast/redis/plugin.go @@ -0,0 +1 @@ +package redis diff --git a/plugins/broadcast/redis/storage.go b/plugins/broadcast/redis/storage.go new file mode 100644 index 00000000..65a229e1 --- /dev/null +++ b/plugins/broadcast/redis/storage.go @@ -0,0 +1 @@ +package redis diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go new file mode 100644 index 00000000..948fd7ae --- /dev/null +++ b/plugins/broadcast/rpc.go @@ -0,0 +1,26 @@ +package broadcast + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type rpc struct { + log logger.Logger + svc *Plugin +} + +func (r *rpc) Publish(msg []*Message, ok *bool) error { + const op = errors.Op("broadcast_publish") + err := r.svc.Publish(msg) + if err != nil { + *ok = false + return errors.E(op, err) + } + *ok = true + return nil +} + +func (r *rpc) PublishAsync() { + +} diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go new file mode 100644 index 00000000..98592950 --- /dev/null +++ b/plugins/broadcast/ws/config.go @@ -0,0 +1 @@ +package ws diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go new file mode 100644 index 00000000..98592950 --- /dev/null +++ b/plugins/broadcast/ws/plugin.go @@ -0,0 +1 @@ +package ws diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go new file mode 100644 index 00000000..98592950 --- /dev/null +++ b/plugins/broadcast/ws/subscriber.go @@ -0,0 +1 @@ +package ws diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 2b68bbe5..d5324246 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -13,10 +13,10 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/worker" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" httpConfig "github.com/spiral/roadrunner/v2/plugins/http/config" - handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/status" diff --git a/plugins/http/serve.go b/plugins/http/serve.go index 78796322..734860f5 100644 --- a/plugins/http/serve.go +++ b/plugins/http/serve.go @@ -21,7 +21,7 @@ func (s *Plugin) serveHTTP(errCh chan error) { if s.http == nil { return } - const op = errors.Op("http_plugin_serve_http") + const op = errors.Op("serveHTTP") if len(s.mdwr) > 0 { applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log) @@ -43,7 +43,7 @@ func (s *Plugin) serveHTTPS(errCh chan error) { if s.https == nil { return } - const op = errors.Op("http_plugin_serve_https") + const op = errors.Op("serveHTTPS") if len(s.mdwr) > 0 { applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log) } @@ -70,7 +70,7 @@ func (s *Plugin) serveFCGI(errCh chan error) { if s.fcgi == nil { return } - const op = errors.Op("http_plugin_serve_fcgi") + const op = errors.Op("serveFCGI") if len(s.mdwr) > 0 { applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log) diff --git a/plugins/http/worker_handler/constants.go b/plugins/http/worker_handler/constants.go deleted file mode 100644 index 3355d9c2..00000000 --- a/plugins/http/worker_handler/constants.go +++ /dev/null @@ -1,8 +0,0 @@ -package handler - -import "net/http" - -var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push") - -// TrailerHeaderKey http header key -var TrailerHeaderKey = http.CanonicalHeaderKey("trailer") diff --git a/plugins/http/worker_handler/errors.go b/plugins/http/worker_handler/errors.go deleted file mode 100644 index 5fa8e64e..00000000 --- a/plugins/http/worker_handler/errors.go +++ /dev/null @@ -1,25 +0,0 @@ -// +build !windows - -package handler - -import ( - "errors" - "net" - "os" - "syscall" -) - -// Broken pipe -var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer") - -// handleWriteError just check if error was caused by aborted connection on linux -func handleWriteError(err error) error { - if netErr, ok2 := err.(*net.OpError); ok2 { - if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { - if errors.Is(syscallErr.Err, syscall.EPIPE) { - return errEPIPE - } - } - } - return err -} diff --git a/plugins/http/worker_handler/errors_windows.go b/plugins/http/worker_handler/errors_windows.go deleted file mode 100644 index 390cc7d1..00000000 --- a/plugins/http/worker_handler/errors_windows.go +++ /dev/null @@ -1,27 +0,0 @@ -// +build windows - -package handler - -import ( - "errors" - "net" - "os" - "syscall" -) - -//Software caused connection abort. -//An established connection was aborted by the software in your host computer, -//possibly due to a data transmission time-out or protocol error. -var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer") - -// handleWriteError just check if error was caused by aborted connection on windows -func handleWriteError(err error) error { - if netErr, ok2 := err.(*net.OpError); ok2 { - if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { - if syscallErr.Err == syscall.WSAECONNABORTED { - return errEPIPE - } - } - } - return err -} diff --git a/plugins/http/worker_handler/handler.go b/plugins/http/worker_handler/handler.go deleted file mode 100644 index be53fc12..00000000 --- a/plugins/http/worker_handler/handler.go +++ /dev/null @@ -1,217 +0,0 @@ -package handler - -import ( - "net" - "net/http" - "strconv" - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/http/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -// MB is 1024 bytes -const MB uint64 = 1024 * 1024 - -// ErrorEvent represents singular http error event. -type ErrorEvent struct { - // Request contains client request, must not be stored. - Request *http.Request - - // Error - associated error, if any. - Error error - - // event timings - start time.Time - elapsed time.Duration -} - -// Elapsed returns duration of the invocation. -func (e *ErrorEvent) Elapsed() time.Duration { - return e.elapsed -} - -// ResponseEvent represents singular http response event. -type ResponseEvent struct { - // Request contains client request, must not be stored. - Request *Request - - // Response contains service response. - Response *Response - - // event timings - start time.Time - elapsed time.Duration -} - -// Elapsed returns duration of the invocation. -func (e *ResponseEvent) Elapsed() time.Duration { - return e.elapsed -} - -// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, -// parsed files and query, payload will include parsed form dataTree (if any). -type Handler struct { - maxRequestSize uint64 - uploads config.Uploads - trusted config.Cidrs - log logger.Logger - pool pool.Pool - mul sync.Mutex - lsn events.Listener -} - -// NewHandler return handle interface implementation -func NewHandler(maxReqSize uint64, uploads config.Uploads, trusted config.Cidrs, pool pool.Pool) (*Handler, error) { - if pool == nil { - return nil, errors.E(errors.Str("pool should be initialized")) - } - return &Handler{ - maxRequestSize: maxReqSize * MB, - uploads: uploads, - pool: pool, - trusted: trusted, - }, nil -} - -// AddListener attaches handler event controller. -func (h *Handler) AddListener(l events.Listener) { - h.mul.Lock() - defer h.mul.Unlock() - - h.lsn = l -} - -// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. -func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - const op = errors.Op("http_plugin_serve_http") - start := time.Now() - - // validating request size - if h.maxRequestSize != 0 { - const op = errors.Op("http_handler_max_size") - if length := r.Header.Get("content-length"); length != "" { - // try to parse the value from the `content-length` header - size, err := strconv.ParseInt(length, 10, 64) - if err != nil { - // if got an error while parsing -> assign 500 code to the writer and return - http.Error(w, errors.E(op, err).Error(), 500) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)}) - return - } - - if size > int64(h.maxRequestSize) { - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)}) - http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), 500) - return - } - } - } - - req, err := NewRequest(r, h.uploads) - if err != nil { - // if pipe is broken, there is no sense to write the header - // in this case we just report about error - if err == errEPIPE { - h.sendEvent(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) - return - } - - http.Error(w, errors.E(op, err).Error(), 500) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - return - } - - // proxy IP resolution - h.resolveIP(req) - - req.Open(h.log) - defer req.Close(h.log) - - p, err := req.Payload() - if err != nil { - http.Error(w, errors.E(op, err).Error(), 500) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - return - } - - rsp, err := h.pool.Exec(p) - if err != nil { - http.Error(w, errors.E(op, err).Error(), 500) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - return - } - - resp, err := NewResponse(rsp) - if err != nil { - http.Error(w, errors.E(op, err).Error(), resp.Status) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - return - } - - h.handleResponse(req, resp, start) - err = resp.Write(w) - if err != nil { - http.Error(w, errors.E(op, err).Error(), 500) - h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) - } -} - -// handleResponse triggers response event. -func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) { - h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) -} - -// sendEvent invokes event handler if any. -func (h *Handler) sendEvent(event interface{}) { - if h.lsn != nil { - h.lsn(event) - } -} - -// get real ip passing multiple proxy -func (h *Handler) resolveIP(r *Request) { - if h.trusted.IsTrusted(r.RemoteAddr) == false { //nolint:gosimple - return - } - - if r.Header.Get("X-Forwarded-For") != "" { - ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",") - ipCount := len(ips) - - for i := ipCount - 1; i >= 0; i-- { - addr := strings.TrimSpace(ips[i]) - if net.ParseIP(addr) != nil { - r.RemoteAddr = addr - return - } - } - - return - } - - // The logic here is the following: - // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address - // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers - // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF. - // CF-Connecting-IP is an Enterprise feature and we check it last in order. - // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string - if r.Header.Get("X-Real-Ip") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip")) - return - } - - if r.Header.Get("True-Client-IP") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP")) - return - } - - if r.Header.Get("CF-Connecting-IP") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP")) - } -} diff --git a/plugins/http/worker_handler/parse.go b/plugins/http/worker_handler/parse.go deleted file mode 100644 index 2790da2a..00000000 --- a/plugins/http/worker_handler/parse.go +++ /dev/null @@ -1,149 +0,0 @@ -package handler - -import ( - "net/http" - - "github.com/spiral/roadrunner/v2/plugins/http/config" -) - -// MaxLevel defines maximum tree depth for incoming request data and files. -const MaxLevel = 127 - -type dataTree map[string]interface{} -type fileTree map[string]interface{} - -// parseData parses incoming request body into data tree. -func parseData(r *http.Request) dataTree { - data := make(dataTree) - if r.PostForm != nil { - for k, v := range r.PostForm { - data.push(k, v) - } - } - - if r.MultipartForm != nil { - for k, v := range r.MultipartForm.Value { - data.push(k, v) - } - } - - return data -} - -// pushes value into data tree. -func (d dataTree) push(k string, v []string) { - keys := FetchIndexes(k) - if len(keys) <= MaxLevel { - d.mount(keys, v) - } -} - -// mount mounts data tree recursively. -func (d dataTree) mount(i []string, v []string) { - if len(i) == 1 { - // single value context (last element) - d[i[0]] = v[len(v)-1] - return - } - - if len(i) == 2 && i[1] == "" { - // non associated array of elements - d[i[0]] = v - return - } - - if p, ok := d[i[0]]; ok { - p.(dataTree).mount(i[1:], v) - return - } - - d[i[0]] = make(dataTree) - d[i[0]].(dataTree).mount(i[1:], v) -} - -// parse incoming dataTree request into JSON (including contentMultipart form dataTree) -func parseUploads(r *http.Request, cfg config.Uploads) *Uploads { - u := &Uploads{ - cfg: cfg, - tree: make(fileTree), - list: make([]*FileUpload, 0), - } - - for k, v := range r.MultipartForm.File { - files := make([]*FileUpload, 0, len(v)) - for _, f := range v { - files = append(files, NewUpload(f)) - } - - u.list = append(u.list, files...) - u.tree.push(k, files) - } - - return u -} - -// pushes new file upload into it's proper place. -func (d fileTree) push(k string, v []*FileUpload) { - keys := FetchIndexes(k) - if len(keys) <= MaxLevel { - d.mount(keys, v) - } -} - -// mount mounts data tree recursively. -func (d fileTree) mount(i []string, v []*FileUpload) { - if len(i) == 1 { - // single value context - d[i[0]] = v[0] - return - } - - if len(i) == 2 && i[1] == "" { - // non associated array of elements - d[i[0]] = v - return - } - - if p, ok := d[i[0]]; ok { - p.(fileTree).mount(i[1:], v) - return - } - - d[i[0]] = make(fileTree) - d[i[0]].(fileTree).mount(i[1:], v) -} - -// FetchIndexes parses input name and splits it into separate indexes list. -func FetchIndexes(s string) []string { - var ( - pos int - ch string - keys = make([]string, 1) - ) - - for _, c := range s { - ch = string(c) - switch ch { - case " ": - // ignore all spaces - continue - case "[": - pos = 1 - continue - case "]": - if pos == 1 { - keys = append(keys, "") - } - pos = 2 - default: - if pos == 1 || pos == 2 { - keys = append(keys, "") - } - - keys[len(keys)-1] += ch - pos = 0 - } - } - - return keys -} diff --git a/plugins/http/worker_handler/request.go b/plugins/http/worker_handler/request.go deleted file mode 100644 index 178bc827..00000000 --- a/plugins/http/worker_handler/request.go +++ /dev/null @@ -1,187 +0,0 @@ -package handler - -import ( - "fmt" - "io/ioutil" - "net" - "net/http" - "net/url" - "strings" - - j "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/http/attributes" - "github.com/spiral/roadrunner/v2/plugins/http/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -const ( - defaultMaxMemory = 32 << 20 // 32 MB - contentNone = iota + 900 - contentStream - contentMultipart - contentFormData -) - -// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. -type Request struct { - // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. - RemoteAddr string `json:"remoteAddr"` - - // Protocol includes HTTP protocol version. - Protocol string `json:"protocol"` - - // Method contains name of HTTP method used for the request. - Method string `json:"method"` - - // URI contains full request URI with scheme and query. - URI string `json:"uri"` - - // Header contains list of request headers. - Header http.Header `json:"headers"` - - // Cookies contains list of request cookies. - Cookies map[string]string `json:"cookies"` - - // RawQuery contains non parsed query string (to be parsed on php end). - RawQuery string `json:"rawQuery"` - - // Parsed indicates that request body has been parsed on RR end. - Parsed bool `json:"parsed"` - - // Uploads contains list of uploaded files, their names, sized and associations with temporary files. - Uploads *Uploads `json:"uploads"` - - // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions. - Attributes map[string]interface{} `json:"attributes"` - - // request body can be parsedData or []byte - body interface{} -} - -func fetchIP(pair string) string { - if !strings.ContainsRune(pair, ':') { - return pair - } - - addr, _, _ := net.SplitHostPort(pair) - return addr -} - -// NewRequest creates new PSR7 compatible request using net/http request. -func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) { - req := &Request{ - RemoteAddr: fetchIP(r.RemoteAddr), - Protocol: r.Proto, - Method: r.Method, - URI: uri(r), - Header: r.Header, - Cookies: make(map[string]string), - RawQuery: r.URL.RawQuery, - Attributes: attributes.All(r), - } - - for _, c := range r.Cookies() { - if v, err := url.QueryUnescape(c.Value); err == nil { - req.Cookies[c.Name] = v - } - } - - switch req.contentType() { - case contentNone: - return req, nil - - case contentStream: - var err error - req.body, err = ioutil.ReadAll(r.Body) - return req, err - - case contentMultipart: - if err := r.ParseMultipartForm(defaultMaxMemory); err != nil { - return nil, err - } - - req.Uploads = parseUploads(r, cfg) - fallthrough - case contentFormData: - if err := r.ParseForm(); err != nil { - return nil, err - } - - req.body = parseData(r) - } - - req.Parsed = true - return req, nil -} - -// Open moves all uploaded files to temporary directory so it can be given to php later. -func (r *Request) Open(log logger.Logger) { - if r.Uploads == nil { - return - } - - r.Uploads.Open(log) -} - -// Close clears all temp file uploads -func (r *Request) Close(log logger.Logger) { - if r.Uploads == nil { - return - } - - r.Uploads.Clear(log) -} - -// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open -// files prior to calling this method. -func (r *Request) Payload() (payload.Payload, error) { - p := payload.Payload{} - - var err error - if p.Context, err = json.Marshal(r); err != nil { - return payload.Payload{}, err - } - - if r.Parsed { - if p.Body, err = json.Marshal(r.body); err != nil { - return payload.Payload{}, err - } - } else if r.body != nil { - p.Body = r.body.([]byte) - } - - return p, nil -} - -// contentType returns the payload content type. -func (r *Request) contentType() int { - if r.Method == "HEAD" || r.Method == "OPTIONS" { - return contentNone - } - - ct := r.Header.Get("content-type") - if strings.Contains(ct, "application/x-www-form-urlencoded") { - return contentFormData - } - - if strings.Contains(ct, "multipart/form-data") { - return contentMultipart - } - - return contentStream -} - -// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). -func uri(r *http.Request) string { - if r.URL.Host != "" { - return r.URL.String() - } - if r.TLS != nil { - return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) - } - - return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) -} diff --git a/plugins/http/worker_handler/response.go b/plugins/http/worker_handler/response.go deleted file mode 100644 index 1763d304..00000000 --- a/plugins/http/worker_handler/response.go +++ /dev/null @@ -1,105 +0,0 @@ -package handler - -import ( - "io" - "net/http" - "strings" - "sync" - - "github.com/spiral/roadrunner/v2/pkg/payload" -) - -// Response handles PSR7 response logic. -type Response struct { - // Status contains response status. - Status int `json:"status"` - - // Header contains list of response headers. - Headers map[string][]string `json:"headers"` - - // associated Body payload. - Body interface{} - sync.Mutex -} - -// NewResponse creates new response based on given pool payload. -func NewResponse(p payload.Payload) (*Response, error) { - r := &Response{Body: p.Body} - if err := json.Unmarshal(p.Context, r); err != nil { - return nil, err - } - - return r, nil -} - -// Write writes response headers, status and body into ResponseWriter. -func (r *Response) Write(w http.ResponseWriter) error { - // INFO map is the reference type in golang - p := handlePushHeaders(r.Headers) - if pusher, ok := w.(http.Pusher); ok { - for _, v := range p { - err := pusher.Push(v, nil) - if err != nil { - return err - } - } - } - - handleTrailers(r.Headers) - for n, h := range r.Headers { - for _, v := range h { - w.Header().Add(n, v) - } - } - - w.WriteHeader(r.Status) - - if data, ok := r.Body.([]byte); ok { - _, err := w.Write(data) - if err != nil { - return handleWriteError(err) - } - } - - if rc, ok := r.Body.(io.Reader); ok { - if _, err := io.Copy(w, rc); err != nil { - return err - } - } - - return nil -} - -func handlePushHeaders(h map[string][]string) []string { - var p []string - pushHeader, ok := h[http2pushHeaderKey] - if !ok { - return p - } - - p = append(p, pushHeader...) - - delete(h, http2pushHeaderKey) - - return p -} - -func handleTrailers(h map[string][]string) { - trailers, ok := h[TrailerHeaderKey] - if !ok { - return - } - - for _, tr := range trailers { - for _, n := range strings.Split(tr, ",") { - n = strings.Trim(n, "\t ") - if v, ok := h[n]; ok { - h["Trailer:"+n] = v - - delete(h, n) - } - } - } - - delete(h, TrailerHeaderKey) -} diff --git a/plugins/http/worker_handler/uploads.go b/plugins/http/worker_handler/uploads.go deleted file mode 100644 index e695000e..00000000 --- a/plugins/http/worker_handler/uploads.go +++ /dev/null @@ -1,159 +0,0 @@ -package handler - -import ( - "github.com/spiral/roadrunner/v2/plugins/http/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - - "io" - "io/ioutil" - "mime/multipart" - "os" - "sync" -) - -const ( - // UploadErrorOK - no error, the file uploaded with success. - UploadErrorOK = 0 - - // UploadErrorNoFile - no file was uploaded. - UploadErrorNoFile = 4 - - // UploadErrorNoTmpDir - missing a temporary folder. - UploadErrorNoTmpDir = 6 - - // UploadErrorCantWrite - failed to write file to disk. - UploadErrorCantWrite = 7 - - // UploadErrorExtension - forbidden file extension. - UploadErrorExtension = 8 -) - -// Uploads tree manages uploaded files tree and temporary files. -type Uploads struct { - // associated temp directory and forbidden extensions. - cfg config.Uploads - - // pre processed data tree for Uploads. - tree fileTree - - // flat list of all file Uploads. - list []*FileUpload -} - -// MarshalJSON marshal tree tree into JSON. -func (u *Uploads) MarshalJSON() ([]byte, error) { - return json.Marshal(u.tree) -} - -// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors -// will be handled individually. -func (u *Uploads) Open(log logger.Logger) { - var wg sync.WaitGroup - for _, f := range u.list { - wg.Add(1) - go func(f *FileUpload) { - defer wg.Done() - err := f.Open(u.cfg) - if err != nil && log != nil { - log.Error("error opening the file", "err", err) - } - }(f) - } - - wg.Wait() -} - -// Clear deletes all temporary files. -func (u *Uploads) Clear(log logger.Logger) { - for _, f := range u.list { - if f.TempFilename != "" && exists(f.TempFilename) { - err := os.Remove(f.TempFilename) - if err != nil && log != nil { - log.Error("error removing the file", "err", err) - } - } - } -} - -// FileUpload represents singular file NewUpload. -type FileUpload struct { - // ID contains filename specified by the client. - Name string `json:"name"` - - // Mime contains mime-type provided by the client. - Mime string `json:"mime"` - - // Size of the uploaded file. - Size int64 `json:"size"` - - // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php - Error int `json:"error"` - - // TempFilename points to temporary file location. - TempFilename string `json:"tmpName"` - - // associated file header - header *multipart.FileHeader -} - -// NewUpload wraps net/http upload into PRS-7 compatible structure. -func NewUpload(f *multipart.FileHeader) *FileUpload { - return &FileUpload{ - Name: f.Filename, - Mime: f.Header.Get("Content-Type"), - Error: UploadErrorOK, - header: f, - } -} - -// Open moves file content into temporary file available for PHP. -// NOTE: -// There is 2 deferred functions, and in case of getting 2 errors from both functions -// error from close of temp file would be overwritten by error from the main file -// STACK -// DEFER FILE CLOSE (2) -// DEFER TMP CLOSE (1) -func (f *FileUpload) Open(cfg config.Uploads) (err error) { - if cfg.Forbids(f.Name) { - f.Error = UploadErrorExtension - return nil - } - - file, err := f.header.Open() - if err != nil { - f.Error = UploadErrorNoFile - return err - } - - defer func() { - // close the main file - err = file.Close() - }() - - tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") - if err != nil { - // most likely cause of this issue is missing tmp dir - f.Error = UploadErrorNoTmpDir - return err - } - - f.TempFilename = tmp.Name() - defer func() { - // close the temp file - err = tmp.Close() - }() - - if f.Size, err = io.Copy(tmp, file); err != nil { - f.Error = UploadErrorCantWrite - } - - return err -} - -// exists if file exists. -func exists(path string) bool { - if _, err := os.Stat(path); os.IsNotExist(err) { - return false - } - return true -} diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 2e2df527..0f647cb1 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" ) @@ -393,7 +394,7 @@ func (d *Driver) startGCLoop() { //nolint:gocognit if b == nil { return errors.E(op, errors.NoSuchBucket) } - err := b.Delete([]byte(k)) + err := b.Delete(utils.AsBytes(k)) if err != nil { return errors.E(op, err) } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 17b06fa0..02281ed5 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -9,6 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -148,7 +149,7 @@ func (d *Driver) Set(items ...kv.Item) error { memcachedItem := &memcache.Item{ Key: items[i].Key, // unsafe convert - Value: []byte(items[i].Value), + Value: utils.AsBytes(items[i].Value), Flags: 0, } diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index 1e0d03d4..c2494ee7 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -9,6 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -70,7 +71,7 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return []byte(data.(kv.Item).Value), nil + return utils.AsBytes(data.(kv.Item).Value), nil } return nil, nil } diff --git a/plugins/logger/std_log_adapter.go b/plugins/logger/std_log_adapter.go index 484cc23e..479aa565 100644 --- a/plugins/logger/std_log_adapter.go +++ b/plugins/logger/std_log_adapter.go @@ -1,7 +1,7 @@ package logger import ( - "unsafe" + "github.com/spiral/roadrunner/v2/utils" ) // StdLogAdapter can be passed to the http.Server or any place which required standard logger to redirect output @@ -12,7 +12,7 @@ type StdLogAdapter struct { // Write io.Writer interface implementation func (s *StdLogAdapter) Write(p []byte) (n int, err error) { - s.log.Error("server internal error", "message", toString(p)) + s.log.Error("server internal error", "message", utils.AsString(p)) return len(p), nil } @@ -24,8 +24,3 @@ func NewStdAdapter(log Logger) *StdLogAdapter { return logAdapter } - -// unsafe, but lightning fast []byte to string conversion -func toString(data []byte) string { - return *(*string)(unsafe.Pointer(&data)) -} diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 2eab7043..f0011690 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -78,5 +78,4 @@ func (s *Plugin) Name() string { } // Available interface implementation -func (s *Plugin) Available() { -} +func (s *Plugin) Available() {} |