summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.gitignore1
-rw-r--r--pkg/worker_handler/constants.go (renamed from plugins/http/worker_handler/constants.go)0
-rw-r--r--pkg/worker_handler/errors.go (renamed from plugins/http/worker_handler/errors.go)0
-rw-r--r--pkg/worker_handler/errors_windows.go (renamed from plugins/http/worker_handler/errors_windows.go)0
-rw-r--r--pkg/worker_handler/handler.go (renamed from plugins/http/worker_handler/handler.go)2
-rw-r--r--pkg/worker_handler/parse.go (renamed from plugins/http/worker_handler/parse.go)0
-rw-r--r--pkg/worker_handler/request.go (renamed from plugins/http/worker_handler/request.go)0
-rw-r--r--pkg/worker_handler/response.go (renamed from plugins/http/worker_handler/response.go)0
-rw-r--r--pkg/worker_handler/uploads.go (renamed from plugins/http/worker_handler/uploads.go)0
-rw-r--r--plugins/broadcast/config.go12
-rw-r--r--plugins/broadcast/doc/.rr-broadcast.yaml29
-rw-r--r--plugins/broadcast/doc/broadcast.drawio151
-rw-r--r--plugins/broadcast/interface.go36
-rw-r--r--plugins/broadcast/memory/config.go6
-rw-r--r--plugins/broadcast/memory/driver.go39
-rw-r--r--plugins/broadcast/memory/plugin.go67
-rw-r--r--plugins/broadcast/plugin.go95
-rw-r--r--plugins/broadcast/redis/driver.go1
-rw-r--r--plugins/broadcast/redis/plugin.go1
-rw-r--r--plugins/broadcast/redis/storage.go1
-rw-r--r--plugins/broadcast/rpc.go26
-rw-r--r--plugins/broadcast/ws/config.go1
-rw-r--r--plugins/broadcast/ws/plugin.go1
-rw-r--r--plugins/broadcast/ws/subscriber.go1
-rw-r--r--plugins/http/serve.go6
-rw-r--r--plugins/kv/drivers/boltdb/driver.go3
-rw-r--r--plugins/kv/drivers/memcached/driver.go3
-rw-r--r--plugins/kv/drivers/memory/driver.go3
-rw-r--r--plugins/logger/std_log_adapter.go9
-rw-r--r--plugins/redis/plugin.go3
-rw-r--r--tests/plugins/http/handler_test.go64
-rw-r--r--tests/plugins/http/parse_test.go6
-rw-r--r--tests/plugins/http/response_test.go18
-rw-r--r--tests/plugins/http/uploads_test.go10
-rwxr-xr-xutils/network.go3
35 files changed, 532 insertions, 66 deletions
diff --git a/.gitignore b/.gitignore
index 9a9a07b6..beb3f885 100755
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ tests/vendor/
.rr-sample.yaml
cmd
rr
+**/old
diff --git a/plugins/http/worker_handler/constants.go b/pkg/worker_handler/constants.go
index 3355d9c2..3355d9c2 100644
--- a/plugins/http/worker_handler/constants.go
+++ b/pkg/worker_handler/constants.go
diff --git a/plugins/http/worker_handler/errors.go b/pkg/worker_handler/errors.go
index 5fa8e64e..5fa8e64e 100644
--- a/plugins/http/worker_handler/errors.go
+++ b/pkg/worker_handler/errors.go
diff --git a/plugins/http/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
index 390cc7d1..390cc7d1 100644
--- a/plugins/http/worker_handler/errors_windows.go
+++ b/pkg/worker_handler/errors_windows.go
diff --git a/plugins/http/worker_handler/handler.go b/pkg/worker_handler/handler.go
index be53fc12..d98cdef0 100644
--- a/plugins/http/worker_handler/handler.go
+++ b/pkg/worker_handler/handler.go
@@ -89,7 +89,7 @@ func (h *Handler) AddListener(l events.Listener) {
// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serve_http")
start := time.Now()
// validating request size
diff --git a/plugins/http/worker_handler/parse.go b/pkg/worker_handler/parse.go
index 2790da2a..2790da2a 100644
--- a/plugins/http/worker_handler/parse.go
+++ b/pkg/worker_handler/parse.go
diff --git a/plugins/http/worker_handler/request.go b/pkg/worker_handler/request.go
index 178bc827..178bc827 100644
--- a/plugins/http/worker_handler/request.go
+++ b/pkg/worker_handler/request.go
diff --git a/plugins/http/worker_handler/response.go b/pkg/worker_handler/response.go
index 1763d304..1763d304 100644
--- a/plugins/http/worker_handler/response.go
+++ b/pkg/worker_handler/response.go
diff --git a/plugins/http/worker_handler/uploads.go b/pkg/worker_handler/uploads.go
index e695000e..e695000e 100644
--- a/plugins/http/worker_handler/uploads.go
+++ b/pkg/worker_handler/uploads.go
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
new file mode 100644
index 00000000..92f7951d
--- /dev/null
+++ b/plugins/broadcast/config.go
@@ -0,0 +1,12 @@
+package broadcast
+
+type Config struct {
+}
+
+func (c *Config) InitDefaults() {
+
+}
+
+func (c *Config) Valid() error {
+ return nil
+}
diff --git a/plugins/broadcast/doc/.rr-broadcast.yaml b/plugins/broadcast/doc/.rr-broadcast.yaml
new file mode 100644
index 00000000..8b0eef20
--- /dev/null
+++ b/plugins/broadcast/doc/.rr-broadcast.yaml
@@ -0,0 +1,29 @@
+broadcast:
+ # path to enable web-socket handler middleware
+ path: /ws
+
+ # optional, redis broker configuration
+ redis:
+ addrs:
+ - "localhost:6379"
+ # if a MasterName is passed a sentinel-backed FailoverClient will be returned
+ master_name: ""
+ username: ""
+ password: ""
+ db: 0
+ sentinel_password: ""
+ route_by_latency: false
+ route_randomly: false
+ dial_timeout: 0 # accepted values [1s, 5m, 3h]
+ max_retries: 1
+ min_retry_backoff: 0 # accepted values [1s, 5m, 3h]
+ max_retry_backoff: 0 # accepted values [1s, 5m, 3h]
+ pool_size: 0
+ min_idle_conns: 0
+ max_conn_age: 0 # accepted values [1s, 5m, 3h]
+ read_timeout: 0 # accepted values [1s, 5m, 3h]
+ write_timeout: 0 # accepted values [1s, 5m, 3h]
+ pool_timeout: 0 # accepted values [1s, 5m, 3h]
+ idle_timeout: 0 # accepted values [1s, 5m, 3h]
+ idle_check_freq: 0 # accepted values [1s, 5m, 3h]
+ read_only: false
diff --git a/plugins/broadcast/doc/broadcast.drawio b/plugins/broadcast/doc/broadcast.drawio
new file mode 100644
index 00000000..2339f5b1
--- /dev/null
+++ b/plugins/broadcast/doc/broadcast.drawio
@@ -0,0 +1,151 @@
+<mxfile>
+ <diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">
+ <mxGraphModel dx="1582" dy="1094" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="1920" pageHeight="1200" math="0" shadow="0">
+ <root>
+ <mxCell id="0"/>
+ <mxCell id="1" parent="0"/>
+ <mxCell id="y4MLTYBancT3lkQri0nA-9" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" edge="1">
+ <mxGeometry relative="1" as="geometry">
+ <mxPoint x="620" y="440" as="targetPoint"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="CDYlmZ7dxupAKddLQDts-15" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="y4MLTYBancT3lkQri0nA-2" edge="1">
+ <mxGeometry relative="1" as="geometry">
+ <Array as="points">
+ <mxPoint x="460" y="240"/>
+ <mxPoint x="270" y="240"/>
+ </Array>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-16" value="4" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=1;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-1" target="34_DfmtK1x9xla_BCLYV-17" edge="1">
+ <mxGeometry x="0.0526" y="10" relative="1" as="geometry">
+ <mxPoint x="459.72413793103465" y="510" as="targetPoint"/>
+ <mxPoint x="-10" y="10" as="offset"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-1" value="BROADCAST HUB (PLUGIN)" style="whiteSpace=wrap;html=1;fillColor=#dae8fc;strokeColor=#6c8ebf;fontColor=#000000;" parent="1" vertex="1">
+ <mxGeometry x="430" y="410" width="120" height="60" as="geometry"/>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-5" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="y4MLTYBancT3lkQri0nA-1" edge="1">
+ <mxGeometry relative="1" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-19" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-2" target="34_DfmtK1x9xla_BCLYV-1" edge="1">
+ <mxGeometry relative="1" as="geometry">
+ <Array as="points">
+ <mxPoint x="240" y="160"/>
+ <mxPoint x="696" y="160"/>
+ </Array>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-22" value="6" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-19" vertex="1" connectable="0">
+ <mxGeometry x="-0.2172" relative="1" as="geometry">
+ <mxPoint x="108.62" y="-10" as="offset"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-2" value="WS" style="whiteSpace=wrap;html=1;fillColor=#f5f5f5;strokeColor=#666666;fontColor=#333333;" parent="1" vertex="1">
+ <mxGeometry x="210" y="290" width="120" height="40" as="geometry"/>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" target="y4MLTYBancT3lkQri0nA-1" edge="1">
+ <mxGeometry relative="1" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-23" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-3" edge="1">
+ <mxGeometry relative="1" as="geometry">
+ <mxPoint x="50" y="440.2068965517242" as="targetPoint"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-3" value="REDIS" style="whiteSpace=wrap;html=1;fillColor=#fff2cc;strokeColor=#d6b656;fontColor=#000000;" parent="1" vertex="1">
+ <mxGeometry x="210" y="420" width="120" height="40" as="geometry"/>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-7" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-4" target="y4MLTYBancT3lkQri0nA-1" edge="1">
+ <mxGeometry relative="1" as="geometry"/>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-4" value="IN-MEMORY" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1">
+ <mxGeometry x="210" y="560" width="120" height="40" as="geometry"/>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-8" value="Collects sub-plugins" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1">
+ <mxGeometry x="310" y="370" width="120" height="20" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-15" value="3" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.75;exitDx=0;exitDy=0;entryX=1;entryY=0.75;entryDx=0;entryDy=0;" parent="1" source="y4MLTYBancT3lkQri0nA-10" target="y4MLTYBancT3lkQri0nA-1" edge="1">
+ <mxGeometry x="0.1429" y="15" relative="1" as="geometry">
+ <mxPoint as="offset"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="y4MLTYBancT3lkQri0nA-10" value="RPC BUS" style="whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=#9673a6;fontColor=#000000;" parent="1" vertex="1">
+ <mxGeometry x="620" y="410" width="120" height="60" as="geometry"/>
+ </mxCell>
+ <mxCell id="CDYlmZ7dxupAKddLQDts-6" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;entryX=0.75;entryY=0;entryDx=0;entryDy=0;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" source="CDYlmZ7dxupAKddLQDts-1" target="y4MLTYBancT3lkQri0nA-1" edge="1">
+ <mxGeometry relative="1" as="geometry"/>
+ </mxCell>
+ <mxCell id="CDYlmZ7dxupAKddLQDts-8" value="Connect to the GO endpoint (from the config)" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="CDYlmZ7dxupAKddLQDts-6" vertex="1" connectable="0">
+ <mxGeometry x="-0.6679" y="-2" relative="1" as="geometry">
+ <mxPoint x="-33.3" y="-13" as="offset"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="CDYlmZ7dxupAKddLQDts-1" value="" style="aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/azure2/general/Browser.svg;" parent="1" vertex="1">
+ <mxGeometry x="652.5" y="30" width="87.5" height="70" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-12" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="y4MLTYBancT3lkQri0nA-10" edge="1">
+ <mxGeometry relative="1" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-20" value="2" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-12" vertex="1" connectable="0">
+ <mxGeometry x="0.2355" y="-1" relative="1" as="geometry">
+ <mxPoint x="9" y="12.69" as="offset"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="CDYlmZ7dxupAKddLQDts-3" value="PUBLISH" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
+ <mxGeometry x="636.25" y="568" width="87.5" height="30" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-11" value="1" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-5" target="CDYlmZ7dxupAKddLQDts-3" edge="1">
+ <mxGeometry x="0.2" y="-10" relative="1" as="geometry">
+ <Array as="points">
+ <mxPoint x="680" y="620"/>
+ <mxPoint x="680" y="620"/>
+ </Array>
+ <mxPoint as="offset"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="CDYlmZ7dxupAKddLQDts-5" value="PHP" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
+ <mxGeometry x="620" y="628" width="120" height="30" as="geometry"/>
+ </mxCell>
+ <mxCell id="CDYlmZ7dxupAKddLQDts-9" value="Save the websocket connection" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;" parent="1" vertex="1">
+ <mxGeometry x="280" y="220" width="180" height="20" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-1" value="BROKER?? -&amp;gt; Subscriber" style="whiteSpace=wrap;html=1;" parent="1" vertex="1">
+ <mxGeometry x="653" y="100" width="87" height="30" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-9" value="&lt;p style=&quot;margin: 0px ; margin-top: 4px ; text-align: center ; text-decoration: underline&quot;&gt;&lt;b&gt;Structure&lt;/b&gt;&lt;/p&gt;&lt;hr&gt;&lt;p style=&quot;margin: 0px ; margin-left: 8px&quot;&gt;Topic = string&lt;br&gt;Payload = raw&lt;br&gt;&lt;/p&gt;&lt;p style=&quot;margin: 0px ; margin-left: 8px&quot;&gt;Broker = ws (from config) (hardcoded)&lt;br&gt;&lt;/p&gt;" style="verticalAlign=top;align=left;overflow=fill;fontSize=12;fontFamily=Helvetica;html=1;" parent="1" vertex="1">
+ <mxGeometry x="780" y="543" width="220" height="80" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-10" value="" style="endArrow=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="CDYlmZ7dxupAKddLQDts-3" target="34_DfmtK1x9xla_BCLYV-9" edge="1">
+ <mxGeometry width="50" height="50" relative="1" as="geometry">
+ <mxPoint x="740" y="608" as="sourcePoint"/>
+ <mxPoint x="790" y="558" as="targetPoint"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-14" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-13" target="y4MLTYBancT3lkQri0nA-1" edge="1">
+ <mxGeometry relative="1" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-13" value="AMQP" style="whiteSpace=wrap;html=1;fillColor=#ffe6cc;strokeColor=#d79b00;fontColor=#000000;" parent="1" vertex="1">
+ <mxGeometry x="210" y="658" width="120" height="40" as="geometry"/>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-18" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;" parent="1" source="34_DfmtK1x9xla_BCLYV-17" target="y4MLTYBancT3lkQri0nA-2" edge="1">
+ <mxGeometry relative="1" as="geometry">
+ <Array as="points">
+ <mxPoint x="160" y="520"/>
+ <mxPoint x="160" y="380"/>
+ <mxPoint x="270" y="380"/>
+ </Array>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-21" value="5" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="34_DfmtK1x9xla_BCLYV-18" vertex="1" connectable="0">
+ <mxGeometry x="-0.317" y="3" relative="1" as="geometry">
+ <mxPoint x="-7" y="-10.34" as="offset"/>
+ </mxGeometry>
+ </mxCell>
+ <mxCell id="34_DfmtK1x9xla_BCLYV-17" value="&lt;ul&gt;&lt;li&gt;Check registered topics&lt;/li&gt;&lt;li&gt;Redirect to driver&lt;/li&gt;&lt;/ul&gt;" style="whiteSpace=wrap;html=1;align=left;" parent="1" vertex="1">
+ <mxGeometry x="240" y="500" width="175" height="40" as="geometry"/>
+ </mxCell>
+ </root>
+ </mxGraphModel>
+ </diagram>
+</mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
new file mode 100644
index 00000000..716b3aac
--- /dev/null
+++ b/plugins/broadcast/interface.go
@@ -0,0 +1,36 @@
+package broadcast
+
+import "encoding/json"
+
+// Subscriber defines the ability to operate as message passing broker.
+type Subscriber interface {
+ // Subscribe broker to one or multiple topics.
+ Subscribe(upstream chan *Message, topics ...string) error
+
+ // SubscribePattern broker to pattern.
+ SubscribePattern(upstream chan *Message, pattern string) error
+
+ // Unsubscribe broker from one or multiple topics.
+ Unsubscribe(upstream chan *Message, topics ...string) error
+
+ // UnsubscribePattern broker from pattern.
+ UnsubscribePattern(upstream chan *Message, pattern string) error
+}
+
+type Storage interface {
+ Store()
+}
+
+type Publisher interface {
+ // Publish one or multiple Channel.
+ Publish(messages ...*Message) error
+}
+
+// Message represent single message.
+type Message struct {
+ // Topic message been pushed into.
+ Topic string `json:"topic"`
+
+ // Payload to be broadcasted. Must be valid json when transferred over RPC.
+ Payload json.RawMessage `json:"payload"`
+}
diff --git a/plugins/broadcast/memory/config.go b/plugins/broadcast/memory/config.go
new file mode 100644
index 00000000..e80695bc
--- /dev/null
+++ b/plugins/broadcast/memory/config.go
@@ -0,0 +1,6 @@
+package memory
+
+// Config for the memory driver is empty, it's just a placeholder
+type Config struct{}
+
+func (c *Config) InitDefaults() {}
diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go
new file mode 100644
index 00000000..2eb45c8e
--- /dev/null
+++ b/plugins/broadcast/memory/driver.go
@@ -0,0 +1,39 @@
+package memory
+
+import "github.com/spiral/roadrunner/v2/plugins/broadcast"
+
+type Driver struct {
+}
+
+func NewInMemoryDriver() broadcast.Subscriber {
+ b := &Driver{}
+ return b
+}
+
+func (d *Driver) Serve() error {
+ panic("implement me")
+}
+
+func (d *Driver) Stop() {
+ panic("implement me")
+}
+
+func (d *Driver) Subscribe(upstream chan *broadcast.Message, topics ...string) error {
+ panic("implement me")
+}
+
+func (d *Driver) SubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+ panic("implement me")
+}
+
+func (d *Driver) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error {
+ panic("implement me")
+}
+
+func (d *Driver) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+ panic("implement me")
+}
+
+func (d *Driver) Publish(messages ...*broadcast.Message) error {
+ panic("implement me")
+}
diff --git a/plugins/broadcast/memory/plugin.go b/plugins/broadcast/memory/plugin.go
new file mode 100644
index 00000000..2bd894a0
--- /dev/null
+++ b/plugins/broadcast/memory/plugin.go
@@ -0,0 +1,67 @@
+package memory
+
+import (
+ "fmt"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "broadcast"
+ SectionName string = "memory"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg *Config
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("memory_plugin_init")
+
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ p.cfg.InitDefaults()
+
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("memory_plugin_serve")
+ errCh := make(chan error)
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+
+ return nil
+}
+
+// Available interface implementation for the plugin
+func (p *Plugin) Available() {}
+
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ // broadcast.memory
+ return fmt.Sprintf("%s.%s", PluginName, SectionName)
+}
+
+func (p *Plugin) Publish(msg []*broadcast.Message) error {
+ return nil
+}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
new file mode 100644
index 00000000..7ad9e2ae
--- /dev/null
+++ b/plugins/broadcast/plugin.go
@@ -0,0 +1,95 @@
+package broadcast
+
+import (
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "broadcast"
+)
+
+type Plugin struct {
+ broker Subscriber
+ driver Storage
+
+ log logger.Logger
+ cfg *Config
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ p.cfg.InitDefaults()
+
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("broadcast_plugin_serve")
+ errCh := make(chan error)
+
+ // if there are no brokers, return nil
+ if p.broker == nil {
+ errCh <- errors.E(op, errors.Str("no broker detected"))
+ return errCh
+ }
+
+ // start the underlying broker
+ go func() {
+ // err := p.broker.Serve()
+ // if err != nil {
+ // errCh <- errors.E(op, err)
+ // }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+// Available interface implementation for the plugin
+func (p *Plugin) Available() {}
+
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectBroker,
+ }
+}
+
+func (p *Plugin) CollectBroker(name endure.Named, broker Subscriber) {
+ p.broker = broker
+}
+
+func (p *Plugin) RPC() interface{} {
+ // create an RPC service for the collects
+ r := &rpc{
+ log: p.log,
+ svc: p,
+ }
+ return r
+}
+
+func (p *Plugin) Publish(msg []*Message) error {
+ const op = errors.Op("broadcast_plugin_publish")
+ return nil
+}
diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go
new file mode 100644
index 00000000..65a229e1
--- /dev/null
+++ b/plugins/broadcast/redis/driver.go
@@ -0,0 +1 @@
+package redis
diff --git a/plugins/broadcast/redis/plugin.go b/plugins/broadcast/redis/plugin.go
new file mode 100644
index 00000000..65a229e1
--- /dev/null
+++ b/plugins/broadcast/redis/plugin.go
@@ -0,0 +1 @@
+package redis
diff --git a/plugins/broadcast/redis/storage.go b/plugins/broadcast/redis/storage.go
new file mode 100644
index 00000000..65a229e1
--- /dev/null
+++ b/plugins/broadcast/redis/storage.go
@@ -0,0 +1 @@
+package redis
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
new file mode 100644
index 00000000..948fd7ae
--- /dev/null
+++ b/plugins/broadcast/rpc.go
@@ -0,0 +1,26 @@
+package broadcast
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type rpc struct {
+ log logger.Logger
+ svc *Plugin
+}
+
+func (r *rpc) Publish(msg []*Message, ok *bool) error {
+ const op = errors.Op("broadcast_publish")
+ err := r.svc.Publish(msg)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+func (r *rpc) PublishAsync() {
+
+}
diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go
new file mode 100644
index 00000000..98592950
--- /dev/null
+++ b/plugins/broadcast/ws/config.go
@@ -0,0 +1 @@
+package ws
diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go
new file mode 100644
index 00000000..98592950
--- /dev/null
+++ b/plugins/broadcast/ws/plugin.go
@@ -0,0 +1 @@
+package ws
diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go
new file mode 100644
index 00000000..98592950
--- /dev/null
+++ b/plugins/broadcast/ws/subscriber.go
@@ -0,0 +1 @@
+package ws
diff --git a/plugins/http/serve.go b/plugins/http/serve.go
index 78796322..734860f5 100644
--- a/plugins/http/serve.go
+++ b/plugins/http/serve.go
@@ -21,7 +21,7 @@ func (s *Plugin) serveHTTP(errCh chan error) {
if s.http == nil {
return
}
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serveHTTP")
if len(s.mdwr) > 0 {
applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log)
@@ -43,7 +43,7 @@ func (s *Plugin) serveHTTPS(errCh chan error) {
if s.https == nil {
return
}
- const op = errors.Op("http_plugin_serve_https")
+ const op = errors.Op("serveHTTPS")
if len(s.mdwr) > 0 {
applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
}
@@ -70,7 +70,7 @@ func (s *Plugin) serveFCGI(errCh chan error) {
if s.fcgi == nil {
return
}
- const op = errors.Op("http_plugin_serve_fcgi")
+ const op = errors.Op("serveFCGI")
if len(s.mdwr) > 0 {
applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 2e2df527..0f647cb1 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
)
@@ -393,7 +394,7 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
- err := b.Delete([]byte(k))
+ err := b.Delete(utils.AsBytes(k))
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 17b06fa0..02281ed5 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -148,7 +149,7 @@ func (d *Driver) Set(items ...kv.Item) error {
memcachedItem := &memcache.Item{
Key: items[i].Key,
// unsafe convert
- Value: []byte(items[i].Value),
+ Value: utils.AsBytes(items[i].Value),
Flags: 0,
}
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index 1e0d03d4..c2494ee7 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -70,7 +71,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return []byte(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(kv.Item).Value), nil
}
return nil, nil
}
diff --git a/plugins/logger/std_log_adapter.go b/plugins/logger/std_log_adapter.go
index 484cc23e..479aa565 100644
--- a/plugins/logger/std_log_adapter.go
+++ b/plugins/logger/std_log_adapter.go
@@ -1,7 +1,7 @@
package logger
import (
- "unsafe"
+ "github.com/spiral/roadrunner/v2/utils"
)
// StdLogAdapter can be passed to the http.Server or any place which required standard logger to redirect output
@@ -12,7 +12,7 @@ type StdLogAdapter struct {
// Write io.Writer interface implementation
func (s *StdLogAdapter) Write(p []byte) (n int, err error) {
- s.log.Error("server internal error", "message", toString(p))
+ s.log.Error("server internal error", "message", utils.AsString(p))
return len(p), nil
}
@@ -24,8 +24,3 @@ func NewStdAdapter(log Logger) *StdLogAdapter {
return logAdapter
}
-
-// unsafe, but lightning fast []byte to string conversion
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))
-}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 2eab7043..f0011690 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -78,5 +78,4 @@ func (s *Plugin) Name() string {
}
// Available interface implementation
-func (s *Plugin) Available() {
-}
+func (s *Plugin) Available() {}
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 575fe656..4c7ab215 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -12,8 +12,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/stretchr/testify/assert"
"net/http"
@@ -35,7 +35,7 @@ func TestHandler_Echo(t *testing.T) {
t.Fatal(err)
}
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -66,7 +66,7 @@ func TestHandler_Echo(t *testing.T) {
}
func Test_HandlerErrors(t *testing.T) {
- _, err := handler.NewHandler(1024, config.Uploads{
+ _, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, nil)
@@ -89,7 +89,7 @@ func TestHandler_Headers(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -150,7 +150,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -210,7 +210,7 @@ func TestHandler_User_Agent(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -270,7 +270,7 @@ func TestHandler_Cookies(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -335,7 +335,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -399,7 +399,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -459,7 +459,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -519,7 +519,7 @@ func TestHandler_FormData_POST(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -592,7 +592,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -665,7 +665,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -737,7 +737,7 @@ func TestHandler_FormData_PUT(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -809,7 +809,7 @@ func TestHandler_FormData_PATCH(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -881,7 +881,7 @@ func TestHandler_Multipart_POST(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -995,7 +995,7 @@ func TestHandler_Multipart_PUT(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -1109,7 +1109,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -1225,7 +1225,7 @@ func TestHandler_Error(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -1271,7 +1271,7 @@ func TestHandler_Error2(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -1317,7 +1317,7 @@ func TestHandler_Error3(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1, config.Uploads{
+ h, err := handler2.NewHandler(1, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -1376,7 +1376,7 @@ func TestHandler_ResponseDuration(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -1401,7 +1401,7 @@ func TestHandler_ResponseDuration(t *testing.T) {
gotresp := make(chan interface{})
h.AddListener(func(event interface{}) {
switch t := event.(type) {
- case handler.ResponseEvent:
+ case handler2.ResponseEvent:
if t.Elapsed() > 0 {
close(gotresp)
}
@@ -1437,7 +1437,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -1462,7 +1462,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
gotresp := make(chan interface{})
h.AddListener(func(event interface{}) {
switch tp := event.(type) {
- case handler.ResponseEvent:
+ case handler2.ResponseEvent:
if tp.Elapsed() > time.Second {
close(gotresp)
}
@@ -1497,7 +1497,7 @@ func TestHandler_ErrorDuration(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
@@ -1522,7 +1522,7 @@ func TestHandler_ErrorDuration(t *testing.T) {
goterr := make(chan interface{})
h.AddListener(func(event interface{}) {
switch tp := event.(type) {
- case handler.ErrorEvent:
+ case handler2.ErrorEvent:
if tp.Elapsed() > 0 {
close(goterr)
}
@@ -1571,7 +1571,7 @@ func TestHandler_IP(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, cidrs, p)
@@ -1632,7 +1632,7 @@ func TestHandler_XRealIP(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, cidrs, p)
@@ -1698,7 +1698,7 @@ func TestHandler_XForwardedFor(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, cidrs, p)
@@ -1763,7 +1763,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, cidrs, p)
@@ -1811,7 +1811,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
p.Destroy(context.Background())
}()
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, p)
diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go
index 15c82839..832ecde4 100644
--- a/tests/plugins/http/parse_test.go
+++ b/tests/plugins/http/parse_test.go
@@ -3,7 +3,7 @@ package http
import (
"testing"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
+ handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler"
)
var samples = []struct {
@@ -21,7 +21,7 @@ var samples = []struct {
func Test_FetchIndexes(t *testing.T) {
for i := 0; i < len(samples); i++ {
- r := handler.FetchIndexes(samples[i].in)
+ r := handler2.FetchIndexes(samples[i].in)
if !same(r, samples[i].out) {
t.Errorf("got %q, want %q", r, samples[i].out)
}
@@ -31,7 +31,7 @@ func Test_FetchIndexes(t *testing.T) {
func BenchmarkConfig_FetchIndexes(b *testing.B) {
for _, tt := range samples {
for n := 0; n < b.N; n++ {
- r := handler.FetchIndexes(tt.in)
+ r := handler2.FetchIndexes(tt.in)
if !same(r, tt.out) {
b.Fail()
}
diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go
index 3564d9cd..648b6255 100644
--- a/tests/plugins/http/response_test.go
+++ b/tests/plugins/http/response_test.go
@@ -7,7 +7,7 @@ import (
"testing"
"github.com/spiral/roadrunner/v2/pkg/payload"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
+ handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/stretchr/testify/assert"
)
@@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error {
}
func TestNewResponse_Error(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{Context: []byte(`invalid payload`)})
+ r, err := handler2.NewResponse(payload.Payload{Context: []byte(`invalid payload`)})
assert.Error(t, err)
assert.Nil(t, r)
}
func TestNewResponse_Write(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
Body: []byte(`sample body`),
})
@@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) {
}
func TestNewResponse_Stream(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -93,7 +93,7 @@ func TestNewResponse_Stream(t *testing.T) {
}
func TestNewResponse_StreamError(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -114,7 +114,7 @@ func TestNewResponse_StreamError(t *testing.T) {
}
func TestWrite_HandlesPush(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`),
})
@@ -129,7 +129,7 @@ func TestWrite_HandlesPush(t *testing.T) {
}
func TestWrite_HandlesTrailers(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler2.NewResponse(payload.Payload{
Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`),
})
@@ -139,7 +139,7 @@ func TestWrite_HandlesTrailers(t *testing.T) {
w := &testWriter{h: http.Header(make(map[string][]string))}
assert.NoError(t, r.Write(w))
- assert.Nil(t, w.h[handler.TrailerHeaderKey])
+ assert.Nil(t, w.h[handler2.TrailerHeaderKey])
assert.Nil(t, w.h["foo"]) //nolint:staticcheck
assert.Nil(t, w.h["baz"]) //nolint:staticcheck
@@ -148,7 +148,7 @@ func TestWrite_HandlesTrailers(t *testing.T) {
}
func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) {
- r, err := handler.NewResponse(payload.Payload{
+ r, err := handler2.NewResponse(payload.Payload{
Context: []byte(
`{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`),
})
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index 5c39589c..5381d30e 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -18,8 +18,8 @@ import (
j "github.com/json-iterator/go"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ handler2 "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/stretchr/testify/assert"
)
@@ -40,7 +40,7 @@ func TestHandler_Upload_File(t *testing.T) {
t.Fatal(err)
}
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, pool)
@@ -123,7 +123,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
t.Fatal(err)
}
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{},
}, nil, pool)
@@ -206,7 +206,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
t.Fatal(err)
}
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: "-------",
Forbid: []string{},
}, nil, pool)
@@ -289,7 +289,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) {
t.Fatal(err)
}
- h, err := handler.NewHandler(1024, config.Uploads{
+ h, err := handler2.NewHandler(1024, config.Uploads{
Dir: os.TempDir(),
Forbid: []string{".go"},
}, nil, pool)
diff --git a/utils/network.go b/utils/network.go
index b73363db..86a7e733 100755
--- a/utils/network.go
+++ b/utils/network.go
@@ -12,7 +12,8 @@ import (
"github.com/valyala/tcplisten"
)
-// - SO_REUSEPORT. This option allows linear scaling server performance
+// CreateListener
+// - SO_REUSEPORT. This option allows linear scaling server performance
// on multi-CPU servers.
// See https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ for details.
//