summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.gitignore1
-rwxr-xr-xpkg/payload/payload.go6
-rwxr-xr-xpkg/pool/static_pool.go6
-rwxr-xr-xpkg/pool/static_pool_test.go3
-rw-r--r--pkg/worker_handler/handler.go2
-rw-r--r--plugins/broadcast/config.go12
-rw-r--r--plugins/broadcast/doc/.rr-broadcast.yaml27
-rw-r--r--plugins/broadcast/doc/broadcast.drawio2
-rw-r--r--plugins/broadcast/interface.go (renamed from plugins/broadcast/root/broker.go)0
-rw-r--r--plugins/broadcast/memory/config.go6
-rw-r--r--plugins/broadcast/memory/driver.go42
-rw-r--r--plugins/broadcast/memory/memory.go131
-rw-r--r--plugins/broadcast/memory/memory_test.go80
-rw-r--r--plugins/broadcast/memory/plugin.go71
-rw-r--r--plugins/broadcast/old/redis.go (renamed from plugins/broadcast/redis/redis.go)2
-rw-r--r--plugins/broadcast/old/redis_test.go (renamed from plugins/broadcast/redis/redis_test.go)2
-rw-r--r--plugins/broadcast/plugin.go87
-rw-r--r--plugins/broadcast/redis/driver.go1
-rw-r--r--plugins/broadcast/redis/plugin.go1
-rw-r--r--plugins/broadcast/root/Makefile9
-rw-r--r--plugins/broadcast/root/client.go133
-rw-r--r--plugins/broadcast/root/client_test.go59
-rw-r--r--plugins/broadcast/root/config.go61
-rw-r--r--plugins/broadcast/root/config_test.go60
-rw-r--r--plugins/broadcast/root/router.go170
-rw-r--r--plugins/broadcast/root/rpc.go25
-rw-r--r--plugins/broadcast/root/rpc_test.go72
-rw-r--r--plugins/broadcast/root/service.go85
-rw-r--r--plugins/broadcast/root/service_test.go65
-rw-r--r--plugins/broadcast/root/tests/.rr.yaml2
-rw-r--r--plugins/broadcast/root/tests/Broadcast/BroadcastTest.php56
-rw-r--r--plugins/broadcast/root/tests/Broadcast/MessageTest.php24
-rw-r--r--plugins/broadcast/root/tests/bootstrap.php15
-rw-r--r--plugins/broadcast/root/tests/docker-compose.yml9
-rw-r--r--plugins/broadcast/root/tests/go-client.go78
-rw-r--r--plugins/broadcast/rpc.go26
-rw-r--r--plugins/broadcast/websockets/Makefile2
-rw-r--r--plugins/broadcast/websockets/access_validator.go102
-rw-r--r--plugins/broadcast/websockets/access_validator_test.go35
-rw-r--r--plugins/broadcast/websockets/config.go21
-rw-r--r--plugins/broadcast/websockets/config_test.go34
-rw-r--r--plugins/broadcast/websockets/conn_context.go66
-rw-r--r--plugins/broadcast/websockets/conn_context_test.go28
-rw-r--r--plugins/broadcast/websockets/conn_pool.go125
-rw-r--r--plugins/broadcast/websockets/event.go40
-rw-r--r--plugins/broadcast/websockets/rpc.go17
-rw-r--r--plugins/broadcast/websockets/service.go228
-rw-r--r--plugins/broadcast/websockets/service_test.go706
-rw-r--r--plugins/http/serve.go6
-rw-r--r--plugins/http/static/etag.go4
-rw-r--r--plugins/kv/drivers/boltdb/driver.go3
-rw-r--r--plugins/kv/drivers/memcached/driver.go3
-rw-r--r--plugins/kv/drivers/memory/driver.go3
-rw-r--r--plugins/kv/rpc.go18
-rw-r--r--plugins/logger/std_log_adapter.go4
-rw-r--r--plugins/server/plugin.go9
-rw-r--r--plugins/service/process.go4
-rw-r--r--utils/convert.go34
58 files changed, 343 insertions, 2580 deletions
diff --git a/.gitignore b/.gitignore
index 9a9a07b6..beb3f885 100755
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ tests/vendor/
.rr-sample.yaml
cmd
rr
+**/old
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go
index 1fe334eb..e1e45ac1 100755
--- a/pkg/payload/payload.go
+++ b/pkg/payload/payload.go
@@ -1,6 +1,8 @@
package payload
-import "unsafe"
+import (
+ "github.com/spiral/roadrunner/v2/utils"
+)
// Payload carries binary header and body to stack and
// back to the server.
@@ -14,5 +16,5 @@ type Payload struct {
// String returns payload body as string
func (p *Payload) String() string {
- return toString(p.Body)
+ return utils.AsString(p.Body)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index e769093c..54192262 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -4,7 +4,6 @@ import (
"context"
"os/exec"
"time"
- "unsafe"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
@@ -12,6 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/transport"
"github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
+ "github.com/spiral/roadrunner/v2/utils"
)
// StopRequest can be sent by worker to indicate that restart is required.
@@ -153,7 +153,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
}
// worker want's to be terminated
- if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest {
+ if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest {
sp.stopWorker(w)
return sp.Exec(p)
}
@@ -183,7 +183,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
}
// worker want's to be terminated
- if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest {
+ if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest {
sp.stopWorker(w)
return sp.execWithTTL(ctx, p)
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 69527815..bf7f10e0 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -16,6 +16,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/utils"
"github.com/stretchr/testify/assert"
)
@@ -653,7 +654,7 @@ func BenchmarkToStringUnsafe(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
- res := toString(testPayload)
+ res := utils.AsString(testPayload)
_ = res
}
}
diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go
index be53fc12..d98cdef0 100644
--- a/pkg/worker_handler/handler.go
+++ b/pkg/worker_handler/handler.go
@@ -89,7 +89,7 @@ func (h *Handler) AddListener(l events.Listener) {
// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serve_http")
start := time.Now()
// validating request size
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
index aa270f64..47dab4f9 100644
--- a/plugins/broadcast/config.go
+++ b/plugins/broadcast/config.go
@@ -1 +1,13 @@
package broadcast
+
+type Config struct {
+
+}
+
+func (c *Config) InitDefaults() {
+
+}
+
+func (c *Config) Valid() error {
+ return nil
+}
diff --git a/plugins/broadcast/doc/.rr-broadcast.yaml b/plugins/broadcast/doc/.rr-broadcast.yaml
index a0a2ad5e..8b0eef20 100644
--- a/plugins/broadcast/doc/.rr-broadcast.yaml
+++ b/plugins/broadcast/doc/.rr-broadcast.yaml
@@ -1,10 +1,29 @@
-# broadcast service configuration.rr.yaml
broadcast:
# path to enable web-socket handler middleware
path: /ws
# optional, redis broker configuration
redis:
- addr: "localhost:6379"
- passsword: ""
- db: 0
+ addrs:
+ - "localhost:6379"
+ # if a MasterName is passed a sentinel-backed FailoverClient will be returned
+ master_name: ""
+ username: ""
+ password: ""
+ db: 0
+ sentinel_password: ""
+ route_by_latency: false
+ route_randomly: false
+ dial_timeout: 0 # accepted values [1s, 5m, 3h]
+ max_retries: 1
+ min_retry_backoff: 0 # accepted values [1s, 5m, 3h]
+ max_retry_backoff: 0 # accepted values [1s, 5m, 3h]
+ pool_size: 0
+ min_idle_conns: 0
+ max_conn_age: 0 # accepted values [1s, 5m, 3h]
+ read_timeout: 0 # accepted values [1s, 5m, 3h]
+ write_timeout: 0 # accepted values [1s, 5m, 3h]
+ pool_timeout: 0 # accepted values [1s, 5m, 3h]
+ idle_timeout: 0 # accepted values [1s, 5m, 3h]
+ idle_check_freq: 0 # accepted values [1s, 5m, 3h]
+ read_only: false
diff --git a/plugins/broadcast/doc/broadcast.drawio b/plugins/broadcast/doc/broadcast.drawio
index f9845dc8..5f9d39b2 100644
--- a/plugins/broadcast/doc/broadcast.drawio
+++ b/plugins/broadcast/doc/broadcast.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-05-03T16:49:17.087Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.6 Safari/537.36" etag="RZNtN_6682KfuWpR1T35" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">1ZhdU6MwFIZ/TWd2L9xJCVB72a9VZ6y6Vkd7mcIBsqaECcG2/voNEAqIdXSEddpelLzJCcl78pCGHp6st2eCRMGcu8B6BnK3PTztGUYfY6R+UmWXK0Nk5YIvqKsblcKCvoAWdZyfUBfiWkPJOZM0qosOD0NwZE0jQvBNvZnHWf2uEfGhISwcwprqA3VloNWhgcqKc6B+UNzaQLpmTYrWWogD4vJNRcKzHp4IzmV+td5OgKXuFcbkcb8P1O5HJiCUHwnYmfPLu+WYhM4dZk9/BEXh6GSoxyZ3xYzBVQboIhcy4D4PCZuV6ljwJHQh7RWpUtnmkvNIiX0l/gUpdzqbJJFcSYFcM10LWyofK9fLtKtfli5Nt7rnrLDThXyc6eAOTl9LMU+EA+/Mub83Xy1b4GuQYqfiBDAi6XO9f6LXj79vtw+94VTd2UB6rdtFnvVKN01U70IS4YPUUWWe1EVlGKWUZe8TmdQDfiYs0VMY316PppPR4k7J5/fKQfTj5vL+7OLqZyPp9ZRuAiphEZHMxY0Cu54+jzI24YyLLBa7BE49R+mxFPwJKjW2cworb5+8ZxAStu+nr5kWHWDiV/YWT4hNBcsiBUGFSBsdzmQtB5813DoWdJTDYvdYNkyLy2pdGZaVOkDO0I/uDIEO0fxSQo0GQQ+LVknxrPT7JinZJ43goazo+acdgvZ7qiZI7cUfI+j1g6w1w+3jI6jGT4lT9wThYyAINwi6nU0vWobI8wznze3GtVe2ZXcDi2l8NyyD44Olvt30/x8s5jHAYjZgubg6mc/m17fLloEB+wAwg+EKoW6AsezvBua04a+aOlOHxDj1NlmdRCzxaRg3zFYmyLqjhFE/VNeOMgOUeePUKqpOiSNdsaaumxMHMX0hq6yr1Nko/bufzcwa96xp2peCLM5567djPX5lPR580HqjK+uLAVU3ghvVFRrft7sZQN+1YPDW2h7aA0xa2gwaR7sOzx6qWL4SyM+G5ZsVPPsH</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-05-04T18:10:53.311Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.6 Safari/537.36" etag="-7c-lVUZQ3gXb_2larRL" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">7Vpbb+I6EP41SHseQLkHHrl02+rQbbe02uW8mcQJPg0xckyB/vpjJw6JcUC0JMt2dUCK4vEF+5v5ZjwjWuZwsbkmYDm/wz6MWobmb1rmqGUYXd1iTy7YZgKr62SCkCA/E+mFYILeoBBqQrpCPkykgRTjiKKlLPRwHEOPSjJACF7LwwIcyb+6BCFUBBMPRKr0B/LpXEh7hlZ03EAUzvOfNjTRswD5aCFI5sDH65LIvGqZQ4Ixzd4WmyGMOHg5MNm8rwd6dzsjMKanTNhad+On6QDE3pMZvXwnSIv77Z7YG93mJ4Y+A0A0MaFzHOIYRFeFdEDwKvYhX1VjrWLMGOMlE+pM+C+kdCu0CVYUM9GcLiLRCzeI/iy9T/lSHVu0RhuxctrYioZ6WgFAglfEg0eOKMbxc5UmCoyuIV5ASrZsAIERoOhVVjwQ9hPuxu2mPmDEtmJowtSdXM+5pVuavAQFJIRUzCr0xF5K2yhEqfaqNTkcTaPFP66/WS37f/v++PuIJm3dvqQqtY5hl7V5VJdMhWT7s1A6b07LfcW0tJXPC1AUDXGESXo80wewG3hMnlCCX2Cpx/G6cBbsrGZP9WeZUabFI+OMOqyrTwjYlgYsudUkh43PcmTjM6w9X7A33nCPjmcv2Q4+aqHHEHwF0UrAMHi874+G/ckTE988M21pXx7Gz9e33/5SbFm21PUcUThZglRjaxZ5ZKv8qKFU2MUrJBRujjqOXAfmngPIY9i6FDhyJzEvxQxHO2wNkpd4L+AX9Qjvce4fdQgfZ7NxIptrDx5nKdRQGPRjUitTApt/K5mSfvgMHNOSPPvUw6DdrS/3Sr0TGbQfamsD3Pl8DJL4U9CpdgaZn5JBpsKgx6vRbc0kCgLDqww3vjNzbKcZsljGpcnifj6yyOFGb4ws1qcki6WQ5fZb++7q7v5xWjNhoHOAMG5vph2F/uOEsZ1LE6ar4MuOHkGP3/S1ZDVrL6NViOJEAZudmMqIggiFMXv3GD6QgTfguCAPRH3RsUC+nzEOJugNzNKlOLIitWDr2oOWPeJrMZIlGd/0eqA396A33ROhN5qCPt9QORA8sKW0wXO9wQDqvg3dKtvuOa4JagoGSvHhV+YeleWIy9ycCsfu/paVherKzXuCwy+IBZWbrPJVWcmVV2T5Yw7Z8/qeoxL7S1Fv+BIQvNj1ejgOUKgm9xzGMZjBqDmndoBRu8qxmNwyispuocnDFn6QkW2t4zhuT2Jlnr+dWWlsm2bHlBfWTXkRHAQJbKbMqNiBok2QLLNSfIA2nM1llVZEm30lo0Vak08TTcF03SjkI7QI2c4jNGNP8LYikB8yhDEkgO3+64AX/SHpJK/hMbUrRD3sW22jY8vhS3WuXTcfU/aublPeNdf3pYq95Vpv3tNEaacW/1qRqVaDem7p9iyVqpnpw/NgfDu5Oe86UgMBerrdcQyJAbp1KgXMxiigKcD8T4FDYNknUsC8JAN0Nbakd/I/4bKga9XIiuW1jm46MsPOuyo0fxGwVX9183B5X2VYHdOVkKzINd2KTKgxR9VTgJqAV7i7E6/hLMHeC1dSbl4IxwqQF0j5z1KE0d2r5VeVJ7sNpfxHwrukiOsU9QHP8Qj/ydhPKecjUkpsUFqO2QVjje1mjv3LW7q2B3BVyq93eSKsYGy/H2PWLP6qkvmQ4g8/5tV/</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/root/broker.go b/plugins/broadcast/interface.go
index 923c8105..923c8105 100644
--- a/plugins/broadcast/root/broker.go
+++ b/plugins/broadcast/interface.go
diff --git a/plugins/broadcast/memory/config.go b/plugins/broadcast/memory/config.go
new file mode 100644
index 00000000..840dbb96
--- /dev/null
+++ b/plugins/broadcast/memory/config.go
@@ -0,0 +1,6 @@
+package memory
+
+// Config for the memory driver is empty, it's just a placeholder
+type Config struct {}
+
+func (c *Config) InitDefaults() {}
diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go
new file mode 100644
index 00000000..8a9374c9
--- /dev/null
+++ b/plugins/broadcast/memory/driver.go
@@ -0,0 +1,42 @@
+package memory
+
+import "github.com/spiral/roadrunner/v2/plugins/broadcast"
+
+type Driver struct {
+
+}
+
+func NewInMemoryDriver() broadcast.Broker {
+ b := &Driver{
+
+ }
+ return b
+}
+
+func (d *Driver) Serve() error {
+ panic("implement me")
+}
+
+func (d *Driver) Stop() {
+ panic("implement me")
+}
+
+func (d *Driver) Subscribe(upstream chan *broadcast.Message, topics ...string) error {
+ panic("implement me")
+}
+
+func (d *Driver) SubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+ panic("implement me")
+}
+
+func (d *Driver) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error {
+ panic("implement me")
+}
+
+func (d *Driver) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+ panic("implement me")
+}
+
+func (d *Driver) Publish(messages ...*broadcast.Message) error {
+ panic("implement me")
+}
diff --git a/plugins/broadcast/memory/memory.go b/plugins/broadcast/memory/memory.go
deleted file mode 100644
index 5b85d68f..00000000
--- a/plugins/broadcast/memory/memory.go
+++ /dev/null
@@ -1,131 +0,0 @@
-package memory
-
-import (
- "errors"
- "sync/atomic"
-)
-
-// Memory manages broadcasting in memory.
-type Memory struct {
- router *Router
- messages chan *Message
- join, leave chan subscriber
- stop chan interface{}
- stopped int32
-}
-
-// memoryBroker creates new memory based message broker.
-func memoryBroker() *Memory {
- return &Memory{
- router: NewRouter(),
- messages: make(chan *Message),
- join: make(chan subscriber),
- leave: make(chan subscriber),
- stop: make(chan interface{}),
- stopped: 0,
- }
-}
-
-// Serve serves broker.
-func (m *Memory) Serve() error {
- for {
- select {
- case ctx := <-m.join:
- ctx.done <- m.handleJoin(ctx)
- case ctx := <-m.leave:
- ctx.done <- m.handleLeave(ctx)
- case msg := <-m.messages:
- m.router.Dispatch(msg)
- case <-m.stop:
- return nil
- }
- }
-}
-
-func (m *Memory) handleJoin(sub subscriber) (err error) {
- if sub.pattern != "" {
- _, err = m.router.SubscribePattern(sub.upstream, sub.pattern)
- return err
- }
-
- m.router.Subscribe(sub.upstream, sub.topics...)
- return nil
-}
-
-func (m *Memory) handleLeave(sub subscriber) error {
- if sub.pattern != "" {
- m.router.UnsubscribePattern(sub.upstream, sub.pattern)
- return nil
- }
-
- m.router.Unsubscribe(sub.upstream, sub.topics...)
- return nil
-}
-
-// Stop closes the consumption and disconnects broker.
-func (m *Memory) Stop() {
- if atomic.CompareAndSwapInt32(&m.stopped, 0, 1) {
- close(m.stop)
- }
-}
-
-// Subscribe broker to one or multiple channels.
-func (m *Memory) Subscribe(upstream chan *Message, topics ...string) error {
- if atomic.LoadInt32(&m.stopped) == 1 {
- return errors.New("broker has been stopped")
- }
-
- ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)}
-
- m.join <- ctx
- return <-ctx.done
-}
-
-// SubscribePattern broker to pattern.
-func (m *Memory) SubscribePattern(upstream chan *Message, pattern string) error {
- if atomic.LoadInt32(&m.stopped) == 1 {
- return errors.New("broker has been stopped")
- }
-
- ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)}
-
- m.join <- ctx
- return <-ctx.done
-}
-
-// Unsubscribe broker from one or multiple channels.
-func (m *Memory) Unsubscribe(upstream chan *Message, topics ...string) error {
- if atomic.LoadInt32(&m.stopped) == 1 {
- return errors.New("broker has been stopped")
- }
-
- ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)}
-
- m.leave <- ctx
- return <-ctx.done
-}
-
-// UnsubscribePattern broker from pattern.
-func (m *Memory) UnsubscribePattern(upstream chan *Message, pattern string) error {
- if atomic.LoadInt32(&m.stopped) == 1 {
- return errors.New("broker has been stopped")
- }
-
- ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)}
-
- m.leave <- ctx
- return <-ctx.done
-}
-
-// Publish one or multiple Channel.
-func (m *Memory) Publish(messages ...*Message) error {
- if atomic.LoadInt32(&m.stopped) == 1 {
- return errors.New("broker has been stopped")
- }
-
- for _, msg := range messages {
- m.messages <- msg
- }
-
- return nil
-}
diff --git a/plugins/broadcast/memory/memory_test.go b/plugins/broadcast/memory/memory_test.go
deleted file mode 100644
index 0eb8d03e..00000000
--- a/plugins/broadcast/memory/memory_test.go
+++ /dev/null
@@ -1,80 +0,0 @@
-package memory
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestMemory_Broadcast(t *testing.T) {
- br, _, c := setup(`{}`)
- defer c.Stop()
-
- client := br.NewClient()
- defer client.Close()
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) // must not be delivered
-
- assert.NoError(t, client.Subscribe("topic"))
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1")))
- assert.Equal(t, `hello1`, readStr(<-client.Channel()))
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello2")))
- assert.Equal(t, `hello2`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.Unsubscribe("topic"))
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello3")))
-
- assert.NoError(t, client.Subscribe("topic"))
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello4")))
- assert.Equal(t, `hello4`, readStr(<-client.Channel()))
-}
-
-func TestMemory_BroadcastPattern(t *testing.T) {
- br, _, c := setup(`{}`)
- defer c.Stop()
-
- client := br.NewClient()
- defer client.Close()
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) // must not be delivered
-
- assert.NoError(t, client.SubscribePattern("topic/*"))
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1")))
- assert.Equal(t, `hello1`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.Publish(newMessage("topic/1", "hello1")))
- assert.Equal(t, `hello1`, readStr(<-client.Channel()))
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic/2", "hello2")))
- assert.Equal(t, `hello2`, readStr(<-client.Channel()))
-
- assert.NoError(t, br.Broker().Publish(newMessage("different", "hello4")))
- assert.NoError(t, br.Broker().Publish(newMessage("topic/2", "hello5")))
-
- assert.Equal(t, `hello5`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.UnsubscribePattern("topic/*"))
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic/3", "hello6")))
-
- assert.NoError(t, client.SubscribePattern("topic/*"))
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic/4", "hello7")))
- assert.Equal(t, `hello7`, readStr(<-client.Channel()))
-}
-
-func TestMemory_NotActive(t *testing.T) {
- b := memoryBroker()
- b.stopped = 1
-
- assert.Error(t, b.Publish(nil))
- assert.Error(t, b.Subscribe(nil))
- assert.Error(t, b.Unsubscribe(nil))
- assert.Error(t, b.SubscribePattern(nil, ""))
- assert.Error(t, b.UnsubscribePattern(nil, ""))
-}
diff --git a/plugins/broadcast/memory/plugin.go b/plugins/broadcast/memory/plugin.go
new file mode 100644
index 00000000..4ebeb4c8
--- /dev/null
+++ b/plugins/broadcast/memory/plugin.go
@@ -0,0 +1,71 @@
+package memory
+
+import (
+ "fmt"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "broadcast"
+ SectionName string = "memory"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg *Config
+
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("memory_plugin_init")
+
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ if !cfg.Has(fmt.Sprintf("%s.%s", PluginName, SectionName)) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ p.cfg.InitDefaults()
+
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("memory_plugin_serve")
+ errCh := make(chan error)
+
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+
+ return nil
+}
+
+// Available interface implementation for the plugin
+func (p *Plugin) Available() {}
+
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ // broadcast.memory
+ return fmt.Sprintf("%s.%s", PluginName, SectionName)
+}
+
+
+
+func (p *Plugin) Publish(msg []*broadcast.Message) error {
+ return nil
+}
diff --git a/plugins/broadcast/redis/redis.go b/plugins/broadcast/old/redis.go
index 41f48658..62970bc2 100644
--- a/plugins/broadcast/redis/redis.go
+++ b/plugins/broadcast/old/redis.go
@@ -1,4 +1,4 @@
-package redis
+package old
import (
"context"
diff --git a/plugins/broadcast/redis/redis_test.go b/plugins/broadcast/old/redis_test.go
index 37027e01..8148c155 100644
--- a/plugins/broadcast/redis/redis_test.go
+++ b/plugins/broadcast/old/redis_test.go
@@ -1,4 +1,4 @@
-package redis
+package old
import (
"fmt"
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 3cedf555..45051a7f 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -1,11 +1,96 @@
package broadcast
+import (
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "broadcast"
+)
type Plugin struct {
+ broker Broker
+
+ log logger.Logger
+ cfg *Config
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ p.cfg.InitDefaults()
+
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("broadcast_plugin_serve")
+ errCh := make(chan error)
+
+ // if there are no brokers, return nil
+ if p.broker == nil {
+ errCh <- errors.E(op, errors.Str("no broker detected"))
+ return errCh
+ }
+
+ // start the underlying broker
+ go func() {
+ err := p.broker.Serve()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ }
+ }()
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
}
+// Available interface implementation for the plugin
+func (p * Plugin) Available() {}
-func (p *Plugin) Init() error {
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectBroker,
+ }
+}
+
+func (p *Plugin) CollectBroker(name endure.Named, broker Broker) {
+ p.broker = broker
+}
+
+func (p *Plugin) RPC() interface{} {
+ // create an RPC service for the collects
+ r := &rpc{
+ log: p.log,
+ svc: p,
+ }
+ return r
+}
+
+func (p *Plugin) Publish(msg []*Message) error {
+ const op = errors.Op("broadcast_plugin_publish")
return nil
}
+
diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go
new file mode 100644
index 00000000..65a229e1
--- /dev/null
+++ b/plugins/broadcast/redis/driver.go
@@ -0,0 +1 @@
+package redis
diff --git a/plugins/broadcast/redis/plugin.go b/plugins/broadcast/redis/plugin.go
new file mode 100644
index 00000000..65a229e1
--- /dev/null
+++ b/plugins/broadcast/redis/plugin.go
@@ -0,0 +1 @@
+package redis
diff --git a/plugins/broadcast/root/Makefile b/plugins/broadcast/root/Makefile
deleted file mode 100644
index d88312d2..00000000
--- a/plugins/broadcast/root/Makefile
+++ /dev/null
@@ -1,9 +0,0 @@
-clean:
- rm -rf rr-jobbroadcast
-install: all
- cp rr-broadcast /usr/local/bin/rr-broadcast
-uninstall:
- rm -f /usr/local/bin/rr-broadcast
-test:
- composer update
- go test -v -race -cover
diff --git a/plugins/broadcast/root/client.go b/plugins/broadcast/root/client.go
deleted file mode 100644
index c5761f94..00000000
--- a/plugins/broadcast/root/client.go
+++ /dev/null
@@ -1,133 +0,0 @@
-package broadcast
-
-import "sync"
-
-// Client subscribes to a given topic and consumes or publish messages to it.
-type Client struct {
- upstream chan *Message
- broker Broker
- mu sync.Mutex
- topics []string
- patterns []string
-}
-
-// Channel returns incoming messages channel.
-func (c *Client) Channel() chan *Message {
- return c.upstream
-}
-
-// Publish message into associated topic or topics.
-func (c *Client) Publish(msg ...*Message) error {
- return c.broker.Publish(msg...)
-}
-
-// Subscribe client to specific topics.
-func (c *Client) Subscribe(topics ...string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- newTopics := make([]string, 0)
- for _, topic := range topics {
- found := false
- for _, e := range c.topics {
- if e == topic {
- found = true
- break
- }
- }
-
- if !found {
- newTopics = append(newTopics, topic)
- }
- }
-
- if len(newTopics) == 0 {
- return nil
- }
-
- c.topics = append(c.topics, newTopics...)
-
- return c.broker.Subscribe(c.upstream, newTopics...)
-}
-
-// SubscribePattern subscribe client to the specific topic pattern.
-func (c *Client) SubscribePattern(pattern string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- for _, g := range c.patterns {
- if g == pattern {
- return nil
- }
- }
-
- c.patterns = append(c.patterns, pattern)
- return c.broker.SubscribePattern(c.upstream, pattern)
-}
-
-// Unsubscribe client from specific topics
-func (c *Client) Unsubscribe(topics ...string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- dropTopics := make([]string, 0)
- for _, topic := range topics {
- for i, e := range c.topics {
- if e == topic {
- c.topics = append(c.topics[:i], c.topics[i+1:]...)
- dropTopics = append(dropTopics, topic)
- }
- }
- }
-
- if len(dropTopics) == 0 {
- return nil
- }
-
- return c.broker.Unsubscribe(c.upstream, dropTopics...)
-}
-
-// UnsubscribePattern client from topic pattern.
-func (c *Client) UnsubscribePattern(pattern string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- for i := range c.patterns {
- if c.patterns[i] == pattern {
- c.patterns = append(c.patterns[:i], c.patterns[i+1:]...)
-
- return c.broker.UnsubscribePattern(c.upstream, pattern)
- }
- }
-
- return nil
-}
-
-// Topics return all the topics client subscribed to.
-func (c *Client) Topics() []string {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- return c.topics
-}
-
-// Patterns return all the patterns client subscribed to.
-func (c *Client) Patterns() []string {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- return c.patterns
-}
-
-// Close the client and consumption.
-func (c *Client) Close() (err error) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if len(c.topics) != 0 {
- err = c.broker.Unsubscribe(c.upstream, c.topics...)
- }
-
- close(c.upstream)
- return err
-}
diff --git a/plugins/broadcast/root/client_test.go b/plugins/broadcast/root/client_test.go
deleted file mode 100644
index 52a50d57..00000000
--- a/plugins/broadcast/root/client_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package broadcast
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_Client_Topics(t *testing.T) {
- br, _, c := setup(`{}`)
- defer c.Stop()
-
- client := br.NewClient()
- defer client.Close()
-
- assert.Equal(t, []string{}, client.Topics())
-
- assert.NoError(t, client.Subscribe("topic"))
- assert.Equal(t, []string{"topic"}, client.Topics())
-
- assert.NoError(t, client.Subscribe("topic"))
- assert.Equal(t, []string{"topic"}, client.Topics())
-
- assert.NoError(t, br.broker.Subscribe(client.upstream, "topic"))
- assert.Equal(t, []string{"topic"}, client.Topics())
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1")))
- assert.Equal(t, `hello1`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.Unsubscribe("topic"))
- assert.NoError(t, client.Unsubscribe("topic"))
- assert.NoError(t, br.broker.Unsubscribe(client.upstream, "topic"))
-
- assert.Equal(t, []string{}, client.Topics())
-}
-
-func Test_Client_Patterns(t *testing.T) {
- br, _, c := setup(`{}`)
- defer c.Stop()
-
- client := br.NewClient()
- defer client.Close()
-
- assert.Equal(t, []string{}, client.Patterns())
-
- assert.NoError(t, client.SubscribePattern("topic/*"))
- assert.Equal(t, []string{"topic/*"}, client.Patterns())
-
- assert.NoError(t, br.broker.SubscribePattern(client.upstream, "topic/*"))
- assert.Equal(t, []string{"topic/*"}, client.Patterns())
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1")))
- assert.Equal(t, `hello1`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.UnsubscribePattern("topic/*"))
- assert.NoError(t, br.broker.UnsubscribePattern(client.upstream, "topic/*"))
-
- assert.Equal(t, []string{}, client.Patterns())
-}
diff --git a/plugins/broadcast/root/config.go b/plugins/broadcast/root/config.go
deleted file mode 100644
index 8c732441..00000000
--- a/plugins/broadcast/root/config.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package broadcast
-
-import (
- "errors"
-
- "github.com/go-redis/redis/v8"
-)
-
-// Config configures the broadcast extension.
-type Config struct {
- // RedisConfig configures redis broker.
- Redis *RedisConfig
-}
-
-// Hydrate reads the configuration values from the source configuration.
-//func (c *Config) Hydrate(cfg service.Config) error {
-// if err := cfg.Unmarshal(c); err != nil {
-// return err
-// }
-//
-// if c.Redis != nil {
-// return c.Redis.isValid()
-// }
-//
-// return nil
-//}
-
-// InitDefaults enables in memory broadcast configuration.
-func (c *Config) InitDefaults() error {
- return nil
-}
-
-// RedisConfig configures redis broker.
-type RedisConfig struct {
- // Addr of the redis server.
- Addr string
-
- // Password to redis server.
- Password string
-
- // DB index.
- DB int
-}
-
-// clusterOptions
-func (cfg *RedisConfig) redisClient() redis.UniversalClient {
- return redis.NewClient(&redis.Options{
- Addr: cfg.Addr,
- Password: cfg.Password,
- PoolSize: 2,
- })
-}
-
-// check if redis config is valid.
-func (cfg *RedisConfig) isValid() error {
- if cfg.Addr == "" {
- return errors.New("redis addr is required")
- }
-
- return nil
-}
diff --git a/plugins/broadcast/root/config_test.go b/plugins/broadcast/root/config_test.go
deleted file mode 100644
index 28191c6b..00000000
--- a/plugins/broadcast/root/config_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package broadcast
-
-import (
- "encoding/json"
- "testing"
-
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/rpc"
- "github.com/stretchr/testify/assert"
-)
-
-type testCfg struct {
- rpc string
- broadcast string
- target string
-}
-
-func (cfg *testCfg) Get(name string) service.Config {
- if name == ID {
- return &testCfg{target: cfg.broadcast}
- }
-
- if name == rpc.ID {
- return &testCfg{target: cfg.rpc}
- }
-
- return nil
-}
-
-func (cfg *testCfg) Unmarshal(out interface{}) error {
- return json.Unmarshal([]byte(cfg.target), out)
-}
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &testCfg{target: `{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_OK(t *testing.T) {
- cfg := &testCfg{target: `{"path":"/path"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Redis_Error(t *testing.T) {
- cfg := &testCfg{target: `{"path":"/path","redis":{}}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Redis_OK(t *testing.T) {
- cfg := &testCfg{target: `{"path":"/path","redis":{"addr":"localhost:6379"}}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go
deleted file mode 100644
index 91137f8b..00000000
--- a/plugins/broadcast/root/router.go
+++ /dev/null
@@ -1,170 +0,0 @@
-package broadcast
-
-//import "github.com/gobwas/glob"
-
-// Router performs internal message routing to multiple subscribers.
-type Router struct {
- wildcard map[string]wildcard
- routes map[string][]chan *Message
-}
-
-// wildcard handles number of topics via glob pattern.
-type wildcard struct {
- //glob glob.Glob
- upstream []chan *Message
-}
-
-// helper for blocking join/leave flow
-type subscriber struct {
- upstream chan *Message
- done chan error
- topics []string
- pattern string
-}
-
-// NewRouter creates new topic and pattern router.
-func NewRouter() *Router {
- return &Router{
- wildcard: make(map[string]wildcard),
- routes: make(map[string][]chan *Message),
- }
-}
-
-// Dispatch to all connected topics.
-func (r *Router) Dispatch(msg *Message) {
- for _, w := range r.wildcard {
- if w.glob.Match(msg.Topic) {
- for _, upstream := range w.upstream {
- upstream <- msg
- }
- }
- }
-
- if routes, ok := r.routes[msg.Topic]; ok {
- for _, upstream := range routes {
- upstream <- msg
- }
- }
-}
-
-// Subscribe to topic and return list of newly assigned topics.
-func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) {
- newTopics = make([]string, 0)
- for _, topic := range topics {
- if _, ok := r.routes[topic]; !ok {
- r.routes[topic] = []chan *Message{upstream}
- if !r.collapsed(topic) {
- newTopics = append(newTopics, topic)
- }
- continue
- }
-
- joined := false
- for _, up := range r.routes[topic] {
- if up == upstream {
- joined = true
- break
- }
- }
-
- if !joined {
- r.routes[topic] = append(r.routes[topic], upstream)
- }
- }
-
- return newTopics
-}
-
-// Unsubscribe from given list of topics and return list of topics which are no longer claimed.
-func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) {
- dropTopics = make([]string, 0)
- for _, topic := range topics {
- if _, ok := r.routes[topic]; !ok {
- // no such topic, ignore
- continue
- }
-
- for i := range r.routes[topic] {
- if r.routes[topic][i] == upstream {
- r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...)
- break
- }
- }
-
- if len(r.routes[topic]) == 0 {
- delete(r.routes, topic)
-
- // standalone empty subscription
- if !r.collapsed(topic) {
- dropTopics = append(dropTopics, topic)
- }
- }
- }
-
- return dropTopics
-}
-
-// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in
-// case if blob is invalid.
-func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) {
- if w, ok := r.wildcard[pattern]; ok {
- joined := false
- for _, up := range w.upstream {
- if up == upstream {
- joined = true
- break
- }
- }
-
- if !joined {
- w.upstream = append(w.upstream, upstream)
- }
-
- return nil, nil
- }
-
- g, err := glob.Compile(pattern)
- if err != nil {
- return nil, err
- }
-
- r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}}
-
- return []string{pattern}, nil
-}
-
-// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed.
-func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) {
- // todo: store and return collapsed topics
-
- w, ok := r.wildcard[pattern]
- if !ok {
- // no such pattern
- return nil
- }
-
- for i, up := range w.upstream {
- if up == upstream {
- w.upstream[i] = w.upstream[len(w.upstream)-1]
- w.upstream[len(w.upstream)-1] = nil
- w.upstream = w.upstream[:len(w.upstream)-1]
-
- if len(w.upstream) == 0 {
- delete(r.wildcard, pattern)
- return []string{pattern}
- }
- }
- }
-
- return nil
-}
-
-func (r *Router) collapsed(topic string) bool {
- for _, w := range r.wildcard {
- if w.glob.Match(topic) {
- return true
- }
- }
-
- return false
-}
diff --git a/plugins/broadcast/root/rpc.go b/plugins/broadcast/root/rpc.go
deleted file mode 100644
index 5604a574..00000000
--- a/plugins/broadcast/root/rpc.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package broadcast
-
-import "golang.org/x/sync/errgroup"
-
-type rpcService struct {
- svc *Service
-}
-
-// Publish Messages.
-func (r *rpcService) Publish(msg []*Message, ok *bool) error {
- *ok = true
- return r.svc.Publish(msg...)
-}
-
-// Publish Messages in async mode. Blocks until get an err or nil from publish
-func (r *rpcService) PublishAsync(msg []*Message, ok *bool) error {
- *ok = true
- g := &errgroup.Group{}
-
- g.Go(func() error {
- return r.svc.Publish(msg...)
- })
-
- return g.Wait()
-}
diff --git a/plugins/broadcast/root/rpc_test.go b/plugins/broadcast/root/rpc_test.go
deleted file mode 100644
index 157c4e70..00000000
--- a/plugins/broadcast/root/rpc_test.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package broadcast
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestRPC_Broadcast(t *testing.T) {
- br, rpc, c := setup(`{}`)
- defer c.Stop()
-
- client := br.NewClient()
- defer client.Close()
-
- rcpClient, err := rpc.Client()
- assert.NoError(t, err)
-
- // must not be delivered
- ok := false
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello1"`)},
- &ok,
- ))
- assert.True(t, ok)
-
- assert.NoError(t, client.Subscribe("topic"))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello1"`)},
- &ok,
- ))
- assert.True(t, ok)
- assert.Equal(t, `"hello1"`, readStr(<-client.Channel()))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello2"`)},
- &ok,
- ))
- assert.True(t, ok)
- assert.Equal(t, `"hello2"`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.Unsubscribe("topic"))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello3"`)},
- &ok,
- ))
- assert.True(t, ok)
-
- assert.NoError(t, client.Subscribe("topic"))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello4"`)},
- &ok,
- ))
- assert.True(t, ok)
- assert.Equal(t, `"hello4"`, readStr(<-client.Channel()))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.PublishAsync",
- []*Message{newMessage("topic", `"hello5"`)},
- &ok,
- ))
- assert.True(t, ok)
- assert.Equal(t, `"hello5"`, readStr(<-client.Channel()))
-}
diff --git a/plugins/broadcast/root/service.go b/plugins/broadcast/root/service.go
deleted file mode 100644
index 8b175b3e..00000000
--- a/plugins/broadcast/root/service.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package broadcast
-
-import (
- "errors"
- "sync"
-
- "github.com/spiral/roadrunner/service/rpc"
-)
-
-// ID defines public service name.
-const ID = "broadcast"
-
-// Service manages even broadcasting and websocket interface.
-type Service struct {
- // service and broker configuration
- cfg *Config
-
- // broker
- mu sync.Mutex
- broker Broker
-}
-
-// Init service.
-func (s *Service) Init(cfg *Config, rpc *rpc.Service) (ok bool, err error) {
- s.cfg = cfg
-
- if rpc != nil {
- if err := rpc.Register(ID, &rpcService{svc: s}); err != nil {
- return false, err
- }
- }
-
- s.mu.Lock()
- if s.cfg.Redis != nil {
- if s.broker, err = redisBroker(s.cfg.Redis); err != nil {
- return false, err
- }
- } else {
- s.broker = memoryBroker()
- }
- s.mu.Unlock()
-
- return true, nil
-}
-
-// Serve broadcast broker.
-func (s *Service) Serve() (err error) {
- return s.broker.Serve()
-}
-
-// Stop closes broadcast broker.
-func (s *Service) Stop() {
- broker := s.Broker()
- if broker != nil {
- broker.Stop()
- }
-}
-
-// Broker returns associated broker.
-func (s *Service) Broker() Broker {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- return s.broker
-}
-
-// NewClient returns single connected client with ability to consume or produce into associated topic(svc).
-func (s *Service) NewClient() *Client {
- return &Client{
- upstream: make(chan *Message),
- broker: s.Broker(),
- topics: make([]string, 0),
- patterns: make([]string, 0),
- }
-}
-
-// Publish one or multiple Channel.
-func (s *Service) Publish(msg ...*Message) error {
- broker := s.Broker()
- if broker == nil {
- return errors.New("no stopped broker")
- }
-
- return s.Broker().Publish(msg...)
-}
diff --git a/plugins/broadcast/root/service_test.go b/plugins/broadcast/root/service_test.go
deleted file mode 100644
index 10b924cc..00000000
--- a/plugins/broadcast/root/service_test.go
+++ /dev/null
@@ -1,65 +0,0 @@
-package broadcast
-
-import (
- "fmt"
- "strings"
- "testing"
- "time"
-
- "github.com/sirupsen/logrus"
- "github.com/sirupsen/logrus/hooks/test"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/rpc"
- "github.com/stretchr/testify/assert"
-)
-
-var rpcPort = 6010
-
-func setup(cfg string) (*Service, *rpc.Service, service.Container) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{
- broadcast: cfg,
- rpc: fmt.Sprintf(`{"listen":"tcp://:%v"}`, rpcPort),
- })
-
- rpcPort++
-
- if err != nil {
- panic(err)
- }
-
- go func() {
- err = c.Serve()
- if err != nil {
- panic(err)
- }
- }()
- time.Sleep(time.Millisecond * 100)
-
- b, _ := c.Get(ID)
- br := b.(*Service)
-
- r, _ := c.Get(rpc.ID)
- rp := r.(*rpc.Service)
-
- return br, rp, c
-}
-
-func readStr(m *Message) string {
- return strings.TrimRight(string(m.Payload), "\n")
-}
-
-func newMessage(t, m string) *Message {
- return &Message{Topic: t, Payload: []byte(m)}
-}
-
-func TestService_Publish(t *testing.T) {
- svc := &Service{}
- assert.Error(t, svc.Publish(nil))
-}
diff --git a/plugins/broadcast/root/tests/.rr.yaml b/plugins/broadcast/root/tests/.rr.yaml
deleted file mode 100644
index c35a12fc..00000000
--- a/plugins/broadcast/root/tests/.rr.yaml
+++ /dev/null
@@ -1,2 +0,0 @@
-broadcast:
- redis.addr: "localhost:6379" \ No newline at end of file
diff --git a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php
deleted file mode 100644
index d6014bf0..00000000
--- a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php
+++ /dev/null
@@ -1,56 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-
-declare(strict_types=1);
-
-namespace Spiral\Broadcast\Tests;
-
-use PHPUnit\Framework\TestCase;
-use Spiral\Broadcast\Broadcast;
-use Spiral\Broadcast\Exception\BroadcastException;
-use Spiral\Broadcast\Message;
-use Spiral\Goridge\RPC;
-use Spiral\Goridge\SocketRelay;
-
-class BroadcastTest extends TestCase
-{
- public function testBroadcast(): void
- {
- $rpc = new RPC(new SocketRelay('localhost', 6001));
- $br = new Broadcast($rpc);
-
- $br->publish(
- new Message('tests/topic', 'hello'),
- new Message('tests/123', ['key' => 'value'])
- );
-
- while (filesize(__DIR__ . '/../log.txt') < 40) {
- clearstatcache(true, __DIR__ . '/../log.txt');
- usleep(1000);
- }
-
- clearstatcache(true, __DIR__ . '/../log.txt');
- $content = file_get_contents(__DIR__ . '/../log.txt');
-
- $this->assertSame('tests/topic: "hello"
-tests/123: {"key":"value"}
-', $content);
- }
-
- public function testBroadcastException(): void
- {
- $rpc = new RPC(new SocketRelay('localhost', 6002));
- $br = new Broadcast($rpc);
-
- $this->expectException(BroadcastException::class);
- $br->publish(
- new Message('topic', 'hello')
- );
- }
-}
diff --git a/plugins/broadcast/root/tests/Broadcast/MessageTest.php b/plugins/broadcast/root/tests/Broadcast/MessageTest.php
deleted file mode 100644
index dd9e1cc3..00000000
--- a/plugins/broadcast/root/tests/Broadcast/MessageTest.php
+++ /dev/null
@@ -1,24 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-
-declare(strict_types=1);
-
-namespace Spiral\Broadcast\Tests;
-
-use PHPUnit\Framework\TestCase;
-use Spiral\Broadcast\Message;
-
-class MessageTest extends TestCase
-{
- public function testSerialize(): void
- {
- $m = new Message('topic', ['hello' => 'world']);
- $this->assertSame('{"topic":"topic","payload":{"hello":"world"}}', json_encode($m));
- }
-}
diff --git a/plugins/broadcast/root/tests/bootstrap.php b/plugins/broadcast/root/tests/bootstrap.php
deleted file mode 100644
index d0dfb88b..00000000
--- a/plugins/broadcast/root/tests/bootstrap.php
+++ /dev/null
@@ -1,15 +0,0 @@
-<?php
-
-/**
- * Spiral Framework, SpiralScout LLC.
- *
- * @author Anton Titov (Wolfy-J)
- */
-
-declare(strict_types=1);
-
-error_reporting(E_ALL | E_STRICT);
-ini_set('display_errors', 'stderr');
-
-//Composer
-require dirname(__DIR__) . '/vendor_php/autoload.php';
diff --git a/plugins/broadcast/root/tests/docker-compose.yml b/plugins/broadcast/root/tests/docker-compose.yml
deleted file mode 100644
index 123aa9b9..00000000
--- a/plugins/broadcast/root/tests/docker-compose.yml
+++ /dev/null
@@ -1,9 +0,0 @@
-version: '3'
-
-services:
- redis:
- image: 'bitnami/redis:latest'
- environment:
- - ALLOW_EMPTY_PASSWORD=yes
- ports:
- - "6379:6379" \ No newline at end of file
diff --git a/plugins/broadcast/root/tests/go-client.go b/plugins/broadcast/root/tests/go-client.go
deleted file mode 100644
index 21442a01..00000000
--- a/plugins/broadcast/root/tests/go-client.go
+++ /dev/null
@@ -1,78 +0,0 @@
-package main
-
-import (
- "fmt"
- "os"
-
- "github.com/spiral/broadcast/v2"
- rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/service/rpc"
- "golang.org/x/sync/errgroup"
-)
-
-type logService struct {
- broadcast *broadcast.Service
- stop chan interface{}
-}
-
-func (l *logService) Init(service *broadcast.Service) (bool, error) {
- l.broadcast = service
-
- return true, nil
-}
-
-func (l *logService) Serve() error {
- l.stop = make(chan interface{})
-
- client := l.broadcast.NewClient()
- if err := client.SubscribePattern("tests/*"); err != nil {
- return err
- }
-
- logFile, _ := os.Create("log.txt")
-
- g := &errgroup.Group{}
- g.Go(func() error {
- for msg := range client.Channel() {
- _, err := logFile.Write([]byte(fmt.Sprintf(
- "%s: %s\n",
- msg.Topic,
- string(msg.Payload),
- )))
- if err != nil {
- return err
- }
-
- err = logFile.Sync()
- if err != nil {
- return err
- }
- }
- return nil
- })
-
- <-l.stop
- err := logFile.Close()
- if err != nil {
- return err
- }
-
- err = client.Close()
- if err != nil {
- return err
- }
-
- return g.Wait()
-}
-
-func (l *logService) Stop() {
- close(l.stop)
-}
-
-func main() {
- rr.Container.Register(rpc.ID, &rpc.Service{})
- rr.Container.Register(broadcast.ID, &broadcast.Service{})
- rr.Container.Register("log", &logService{})
-
- rr.Execute()
-}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index aa270f64..92a2f368 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -1 +1,27 @@
package broadcast
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type rpc struct {
+ log logger.Logger
+ svc *Plugin
+}
+
+
+func (r *rpc) Publish(msg []*Message, ok *bool) error {
+ const op = errors.Op("broadcast_publish")
+ err := r.svc.Publish(msg)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+func (r *rpc) PublishAsync() {
+
+}
diff --git a/plugins/broadcast/websockets/Makefile b/plugins/broadcast/websockets/Makefile
deleted file mode 100644
index f32efbdb..00000000
--- a/plugins/broadcast/websockets/Makefile
+++ /dev/null
@@ -1,2 +0,0 @@
-est:
- go test -v -race -cover
diff --git a/plugins/broadcast/websockets/access_validator.go b/plugins/broadcast/websockets/access_validator.go
deleted file mode 100644
index bf27386d..00000000
--- a/plugins/broadcast/websockets/access_validator.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package websockets
-
-import (
- "bytes"
- "io"
- "net/http"
- "strings"
-
- "github.com/spiral/roadrunner/v2/plugins/http/attributes"
-)
-
-type accessValidator struct {
- buffer *bytes.Buffer
- header http.Header
- status int
-}
-
-func newValidator() *accessValidator {
- return &accessValidator{
- buffer: bytes.NewBuffer(nil),
- header: make(http.Header),
- }
-}
-
-// copy all content to parent response writer.
-func (w *accessValidator) copy(rw http.ResponseWriter) {
- rw.WriteHeader(w.status)
-
- for k, v := range w.header {
- for _, vv := range v {
- rw.Header().Add(k, vv)
- }
- }
-
- _, _ = io.Copy(rw, w.buffer)
-}
-
-// Header returns the header map that will be sent by WriteHeader.
-func (w *accessValidator) Header() http.Header {
- return w.header
-}
-
-// Write writes the data to the connection as part of an HTTP reply.
-func (w *accessValidator) Write(p []byte) (int, error) {
- return w.buffer.Write(p)
-}
-
-// WriteHeader sends an HTTP response header with the provided status code.
-func (w *accessValidator) WriteHeader(statusCode int) {
- w.status = statusCode
-}
-
-// IsOK returns true if response contained 200 status code.
-func (w *accessValidator) IsOK() bool {
- return w.status == 200
-}
-
-// Body returns response body to rely to user.
-func (w *accessValidator) Body() []byte {
- return w.buffer.Bytes()
-}
-
-// Error contains server response.
-func (w *accessValidator) Error() string {
- return w.buffer.String()
-}
-
-// assertServerAccess checks if user can join server and returns error and body if user can not. Must return nil in
-// case of error
-func (w *accessValidator) assertServerAccess(f http.HandlerFunc, r *http.Request) error {
- if err := attributes.Set(r, "ws:joinServer", true); err != nil {
- return err
- }
-
- defer delete(attributes.All(r), "ws:joinServer")
-
- f(w, r)
-
- if !w.IsOK() {
- return w
- }
-
- return nil
-}
-
-// assertAccess checks if user can access given upstream, the application will receive all user headers and cookies.
-// the decision to authorize user will be based on response code (200).
-func (w *accessValidator) assertTopicsAccess(f http.HandlerFunc, r *http.Request, channels ...string) error {
- if err := attributes.Set(r, "ws:joinTopics", strings.Join(channels, ",")); err != nil {
- return err
- }
-
- defer delete(attributes.All(r), "ws:joinTopics")
-
- f(w, r)
-
- if !w.IsOK() {
- return w
- }
-
- return nil
-}
diff --git a/plugins/broadcast/websockets/access_validator_test.go b/plugins/broadcast/websockets/access_validator_test.go
deleted file mode 100644
index 41372727..00000000
--- a/plugins/broadcast/websockets/access_validator_test.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package websockets
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestResponseWrapper_Body(t *testing.T) {
- w := newValidator()
- _, _ =w.Write([]byte("hello"))
-
- assert.Equal(t, []byte("hello"), w.Body())
-}
-
-func TestResponseWrapper_Header(t *testing.T) {
- w := newValidator()
- w.Header().Set("k", "value")
-
- assert.Equal(t, "value", w.Header().Get("k"))
-}
-
-func TestResponseWrapper_StatusCode(t *testing.T) {
- w := newValidator()
- w.WriteHeader(200)
-
- assert.True(t, w.IsOK())
-}
-
-func TestResponseWrapper_StatusCodeBad(t *testing.T) {
- w := newValidator()
- w.WriteHeader(400)
-
- assert.False(t, w.IsOK())
-}
diff --git a/plugins/broadcast/websockets/config.go b/plugins/broadcast/websockets/config.go
deleted file mode 100644
index 8a71c7af..00000000
--- a/plugins/broadcast/websockets/config.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package websockets
-
-
-// Config defines the websocket service configuration.
-type Config struct {
- // Path defines on this URL the middleware must be activated. Same path must
- // be handled by underlying application kernel to authorize the consumption.
- Path string
-
- // NoOrigin disables origin check, only for debug.
- NoOrigin bool
-}
-
-// Hydrate reads the configuration values from the source configuration.
-//func (c *Config) Hydrate(cfg service.Config) error {
-// if err := cfg.Unmarshal(c); err != nil {
-// return err
-// }
-//
-// return nil
-//}
diff --git a/plugins/broadcast/websockets/config_test.go b/plugins/broadcast/websockets/config_test.go
deleted file mode 100644
index e646fdc4..00000000
--- a/plugins/broadcast/websockets/config_test.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package websockets
-
-import (
- "encoding/json"
- "testing"
-
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config {
- if name == "same" || name == "jobs" {
- return cfg
- }
-
- return nil
-}
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{cfg: `{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_OK(t *testing.T) {
- cfg := &mockCfg{cfg: `{"path":"/path"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
diff --git a/plugins/broadcast/websockets/conn_context.go b/plugins/broadcast/websockets/conn_context.go
deleted file mode 100644
index f7d62833..00000000
--- a/plugins/broadcast/websockets/conn_context.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package websockets
-
-import (
- "encoding/json"
-
- "github.com/gorilla/websocket"
-)
-
-// ConnContext carries information about websocket connection and it's topics.
-type ConnContext struct {
- // Conn to the client.
- Conn *websocket.Conn
-
- // Topics contain list of currently subscribed topics.
- Topics []string
-
- // upstream to push messages into.
- upstream chan *broadcast.Message
-}
-
-// SendMessage message directly to the client.
-func (ctx *ConnContext) SendMessage(topic string, payload interface{}) (err error) {
- msg := &broadcast.Message{Topic: topic}
- msg.Payload, err = json.Marshal(payload)
-
- if err == nil {
- ctx.upstream <- msg
- }
-
- return err
-}
-
-func (ctx *ConnContext) serve(errHandler func(err error, conn *websocket.Conn)) {
- for msg := range ctx.upstream {
- if err := ctx.Conn.WriteJSON(msg); err != nil {
- errHandler(err, ctx.Conn)
- }
- }
-}
-
-func (ctx *ConnContext) addTopics(topics ...string) {
- for _, topic := range topics {
- found := false
- for _, e := range ctx.Topics {
- if e == topic {
- found = true
- break
- }
- }
-
- if !found {
- ctx.Topics = append(ctx.Topics, topic)
- }
- }
-}
-
-func (ctx *ConnContext) dropTopic(topics ...string) {
- for _, topic := range topics {
- for i, e := range ctx.Topics {
- if e == topic {
- ctx.Topics[i] = ctx.Topics[len(ctx.Topics)-1]
- ctx.Topics = ctx.Topics[:len(ctx.Topics)-1]
- }
- }
- }
-}
diff --git a/plugins/broadcast/websockets/conn_context_test.go b/plugins/broadcast/websockets/conn_context_test.go
deleted file mode 100644
index 466aaa30..00000000
--- a/plugins/broadcast/websockets/conn_context_test.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package websockets
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestConnContext_ManageTopics(t *testing.T) {
- ctx := &ConnContext{Topics: make([]string, 0)}
-
- assert.Equal(t, []string{}, ctx.Topics)
-
- ctx.addTopics("a", "b")
- assert.Equal(t, []string{"a", "b"}, ctx.Topics)
-
- ctx.addTopics("a", "c")
- assert.Equal(t, []string{"a", "b", "c"}, ctx.Topics)
-
- ctx.dropTopic("b", "c")
- assert.Equal(t, []string{"a"}, ctx.Topics)
-
- ctx.dropTopic("b", "c")
- assert.Equal(t, []string{"a"}, ctx.Topics)
-
- ctx.dropTopic("a")
- assert.Equal(t, []string{}, ctx.Topics)
-}
diff --git a/plugins/broadcast/websockets/conn_pool.go b/plugins/broadcast/websockets/conn_pool.go
deleted file mode 100644
index 80092a44..00000000
--- a/plugins/broadcast/websockets/conn_pool.go
+++ /dev/null
@@ -1,125 +0,0 @@
-package websockets
-
-import (
- "errors"
- "sync"
-
- "github.com/gorilla/websocket"
- "github.com/spiral/broadcast/v2"
-)
-
-// manages a set of websocket connections
-type connPool struct {
- errHandler func(err error, conn *websocket.Conn)
-
- mur sync.Mutex
- client *broadcast.Client
- router *broadcast.Router
-
- mu sync.Mutex
- conns map[*websocket.Conn]*ConnContext
-}
-
-// create new connection pool
-func newPool(client *broadcast.Client, errHandler func(err error, conn *websocket.Conn)) *connPool {
- cp := &connPool{
- client: client,
- router: broadcast.NewRouter(),
- errHandler: errHandler,
- conns: map[*websocket.Conn]*ConnContext{},
- }
-
- go func() {
- for msg := range cp.client.Channel() {
- cp.mur.Lock()
- cp.router.Dispatch(msg)
- cp.mur.Unlock()
- }
- }()
-
- return cp
-}
-
-// connect the websocket and register client in message router
-func (cp *connPool) connect(conn *websocket.Conn) (*ConnContext, error) {
- ctx := &ConnContext{
- Conn: conn,
- Topics: []string{},
- upstream: make(chan *broadcast.Message),
- }
-
- cp.mu.Lock()
- cp.conns[conn] = ctx
- cp.mu.Unlock()
-
- go ctx.serve(cp.errHandler)
-
- return ctx, nil
-}
-
-// disconnect the websocket
-func (cp *connPool) disconnect(conn *websocket.Conn) error {
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- ctx, ok := cp.conns[conn]
- if !ok {
- return errors.New("no such connection")
- }
-
- if err := cp.unsubscribe(ctx, ctx.Topics...); err != nil {
- cp.errHandler(err, conn)
- }
-
- delete(cp.conns, conn)
-
- return conn.Close()
-}
-
-// subscribe the connection
-func (cp *connPool) subscribe(ctx *ConnContext, topics ...string) error {
- cp.mur.Lock()
- defer cp.mur.Unlock()
-
- ctx.addTopics(topics...)
-
- newTopics := cp.router.Subscribe(ctx.upstream, topics...)
- if len(newTopics) != 0 {
- return cp.client.Subscribe(newTopics...)
- }
-
- return nil
-}
-
-// unsubscribe the connection
-func (cp *connPool) unsubscribe(ctx *ConnContext, topics ...string) error {
- cp.mur.Lock()
- defer cp.mur.Unlock()
-
- ctx.dropTopic(topics...)
-
- dropTopics := cp.router.Unsubscribe(ctx.upstream, topics...)
- if len(dropTopics) != 0 {
- return cp.client.Unsubscribe(dropTopics...)
- }
-
- return nil
-}
-
-// close the connection pool and disconnect all listeners
-func (cp *connPool) close() {
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- for conn, ctx := range cp.conns {
- if err := cp.unsubscribe(ctx, ctx.Topics...); err != nil {
- cp.errHandler(err, conn)
- }
-
- delete(cp.conns, conn)
-
- if err := conn.Close(); err != nil {
- cp.errHandler(err, conn)
- }
- }
-}
diff --git a/plugins/broadcast/websockets/event.go b/plugins/broadcast/websockets/event.go
deleted file mode 100644
index 3634bb89..00000000
--- a/plugins/broadcast/websockets/event.go
+++ /dev/null
@@ -1,40 +0,0 @@
-package websockets
-
-import (
- "github.com/gorilla/websocket"
-)
-
-const (
- // EventConnect fired when new client is connected, the context is *websocket.Conn.
- EventConnect = iota + 2500
-
- // EventDisconnect fired when websocket is disconnected, context is empty.
- EventDisconnect
-
- // EventJoin caused when topics are being consumed, context if *TopicEvent.
- EventJoin
-
- // EventLeave caused when topic consumption are stopped, context if *TopicEvent.
- EventLeave
-
- // EventError when any broadcast error occurred, the context is *ErrorEvent.
- EventError
-)
-
-// ErrorEvent represents singular broadcast error event.
-type ErrorEvent struct {
- // Conn specific to the error.
- Conn *websocket.Conn
-
- // Error contains job specific error.
- Error error
-}
-
-// TopicEvent caused when topic is joined or left.
-type TopicEvent struct {
- // Conn associated with topics.
- Conn *websocket.Conn
-
- // Topics specific to event.
- Topics []string
-}
diff --git a/plugins/broadcast/websockets/rpc.go b/plugins/broadcast/websockets/rpc.go
deleted file mode 100644
index 1c62b902..00000000
--- a/plugins/broadcast/websockets/rpc.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package websockets
-
-type rpcService struct {
- svc *Service
-}
-
-// Subscribe subscribes broadcast client to the given topic ahead of any websocket connections.
-func (r *rpcService) Subscribe(topic string, ok *bool) error {
- *ok = true
- return r.svc.client.Subscribe(topic)
-}
-
-// SubscribePattern subscribes broadcast client to
-func (r *rpcService) SubscribePattern(pattern string, ok *bool) error {
- *ok = true
- return r.svc.client.SubscribePattern(pattern)
-}
diff --git a/plugins/broadcast/websockets/service.go b/plugins/broadcast/websockets/service.go
deleted file mode 100644
index f3c0c781..00000000
--- a/plugins/broadcast/websockets/service.go
+++ /dev/null
@@ -1,228 +0,0 @@
-package websockets
-
-import (
- "encoding/json"
- "net/http"
- "sync"
- "sync/atomic"
-
- "github.com/gorilla/websocket"
-)
-
-// ID defines service id.
-const ID = "ws"
-
-// Service to manage websocket clients.
-type Service struct {
- cfg *Config
- upgrade websocket.Upgrader
- client *broadcast.Client
- connPool *connPool
- listeners []func(event int, ctx interface{})
- mu sync.Mutex
- stopped int32
- stop chan error
-}
-
-// AddListener attaches server event controller.
-func (s *Service) AddListener(l func(event int, ctx interface{})) {
- s.listeners = append(s.listeners, l)
-}
-
-// Init the service.
-func (s *Service) Init(
- cfg *Config,
- env env.Environment,
- rttp *rhttp.Service,
- rpc *rpc.Service,
- broadcast *broadcast.Service,
-) (bool, error) {
- if broadcast == nil || rpc == nil {
- // unable to activate
- return false, nil
- }
-
- s.cfg = cfg
- s.client = broadcast.NewClient()
- s.connPool = newPool(s.client, s.reportError)
- s.stopped = 0
-
- if err := rpc.Register(ID, &rpcService{svc: s}); err != nil {
- return false, err
- }
-
- if env != nil {
- // ensure that underlying kernel knows what route to handle
- env.SetEnv("RR_BROADCAST_PATH", cfg.Path)
- }
-
- // init all this stuff
- s.upgrade = websocket.Upgrader{}
-
- if s.cfg.NoOrigin {
- s.upgrade.CheckOrigin = func(r *http.Request) bool {
- return true
- }
- }
-
- rttp.AddMiddleware(s.middleware)
-
- return true, nil
-}
-
-// Serve the websocket connections.
-func (s *Service) Serve() error {
- defer s.client.Close()
- defer s.connPool.close()
-
- s.mu.Lock()
- s.stop = make(chan error)
- s.mu.Unlock()
-
- return <-s.stop
-}
-
-// Stop the service and disconnect all connections.
-func (s *Service) Stop() {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- if atomic.CompareAndSwapInt32(&s.stopped, 0, 1) {
- close(s.stop)
- }
-}
-
-// middleware intercepts websocket connections.
-func (s *Service) middleware(f http.HandlerFunc) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != s.cfg.Path {
- f(w, r)
- return
- }
-
- // checking server access
- if err := newValidator().assertServerAccess(f, r); err != nil {
- // show the error to the user
- if av, ok := err.(*accessValidator); ok {
- av.copy(w)
- } else {
- w.WriteHeader(400)
- }
- return
- }
-
- conn, err := s.upgrade.Upgrade(w, r, nil)
- if err != nil {
- s.reportError(err, nil)
- return
- }
-
- s.throw(EventConnect, conn)
-
- // manage connection
- ctx, err := s.connPool.connect(conn)
- if err != nil {
- s.reportError(err, conn)
- return
- }
-
- s.serveConn(ctx, f, r)
- }
-}
-
-// send and receive messages over websocket
-func (s *Service) serveConn(ctx *ConnContext, f http.HandlerFunc, r *http.Request) {
- defer func() {
- if err := s.connPool.disconnect(ctx.Conn); err != nil {
- s.reportError(err, ctx.Conn)
- }
- s.throw(EventDisconnect, ctx.Conn)
- }()
-
- s.handleCommands(ctx, f, r)
-}
-
-func (s *Service) handleCommands(ctx *ConnContext, f http.HandlerFunc, r *http.Request) {
- cmd := &broadcast.Message{}
- for {
- if err := ctx.Conn.ReadJSON(cmd); err != nil {
- s.reportError(err, ctx.Conn)
- return
- }
-
- switch cmd.Topic {
- case "join":
- topics := make([]string, 0)
- if err := unmarshalCommand(cmd, &topics); err != nil {
- s.reportError(err, ctx.Conn)
- return
- }
-
- if len(topics) == 0 {
- continue
- }
-
- if err := newValidator().assertTopicsAccess(f, r, topics...); err != nil {
- s.reportError(err, ctx.Conn)
-
- if err := ctx.SendMessage("#join", topics); err != nil {
- s.reportError(err, ctx.Conn)
- return
- }
-
- continue
- }
-
- if err := s.connPool.subscribe(ctx, topics...); err != nil {
- s.reportError(err, ctx.Conn)
- return
- }
-
- if err := ctx.SendMessage("@join", topics); err != nil {
- s.reportError(err, ctx.Conn)
- return
- }
-
- s.throw(EventJoin, &TopicEvent{Conn: ctx.Conn, Topics: topics})
- case "leave":
- topics := make([]string, 0)
- if err := unmarshalCommand(cmd, &topics); err != nil {
- s.reportError(err, ctx.Conn)
- return
- }
-
- if len(topics) == 0 {
- continue
- }
-
- if err := s.connPool.unsubscribe(ctx, topics...); err != nil {
- s.reportError(err, ctx.Conn)
- return
- }
-
- if err := ctx.SendMessage("@leave", topics); err != nil {
- s.reportError(err, ctx.Conn)
- return
- }
-
- s.throw(EventLeave, &TopicEvent{Conn: ctx.Conn, Topics: topics})
- }
- }
-}
-
-// handle connection error
-func (s *Service) reportError(err error, conn *websocket.Conn) {
- s.throw(EventError, &ErrorEvent{Conn: conn, Error: err})
-}
-
-// throw handles service, server and pool events.
-func (s *Service) throw(event int, ctx interface{}) {
- for _, l := range s.listeners {
- l(event, ctx)
- }
-}
-
-// unmarshalCommand command data.
-func unmarshalCommand(msg *broadcast.Message, v interface{}) error {
- return json.Unmarshal(msg.Payload, v)
-}
diff --git a/plugins/broadcast/websockets/service_test.go b/plugins/broadcast/websockets/service_test.go
deleted file mode 100644
index 911efc38..00000000
--- a/plugins/broadcast/websockets/service_test.go
+++ /dev/null
@@ -1,706 +0,0 @@
-package websockets
-
-import (
- "encoding/json"
- "io/ioutil"
- "net/http"
- "net/url"
- "strings"
- "testing"
- "time"
-
- "github.com/gorilla/websocket"
- "github.com/sirupsen/logrus"
- "github.com/sirupsen/logrus/hooks/test"
- "github.com/spiral/broadcast/v2"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/env"
- rrhttp "github.com/spiral/roadrunner/service/http"
- "github.com/spiral/roadrunner/service/rpc"
- "github.com/stretchr/testify/assert"
-)
-
-type testCfg struct {
- http string
- rpc string
- ws string
- broadcast string
- target string
-}
-
-func (cfg *testCfg) Get(name string) service.Config {
- if name == rrhttp.ID {
- return &testCfg{target: cfg.http}
- }
-
- if name == ID {
- return &testCfg{target: cfg.ws}
- }
-
- if name == rpc.ID {
- return &testCfg{target: cfg.rpc}
- }
-
- if name == broadcast.ID {
- return &testCfg{target: cfg.broadcast}
- }
-
- return nil
-}
-func (cfg *testCfg) Unmarshal(out interface{}) error {
- return json.Unmarshal([]byte(cfg.target), out)
-}
-
-func readStr(m interface{}) string {
- return strings.TrimRight(string(m.([]byte)), "\n")
-}
-
-func Test_HttpService_Echo(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(rrhttp.ID, &rrhttp.Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6041",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 3000)
- defer c.Stop()
-
- req, err := http.NewRequest("GET", "http://localhost:6041/", nil)
- assert.NoError(t, err)
-
- r, err := http.DefaultClient.Do(req)
- assert.NoError(t, err)
- defer func() {
- _ = r.Body.Close()
- }()
-
- b, _ := ioutil.ReadAll(r.Body)
-
- assert.NoError(t, err)
- assert.Equal(t, 200, r.StatusCode)
- assert.Equal(t, []byte(""), b)
-}
-
-func Test_HttpService_Echo400(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(rrhttp.ID, &rrhttp.Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6040",
- "workers":{"command": "php tests/worker-stop.php", "pool.numWorkers": 1}
- }`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 3000)
- defer c.Stop()
-
- req, err := http.NewRequest("GET", "http://localhost:6040/", nil)
- assert.NoError(t, err)
-
- r, err := http.DefaultClient.Do(req)
- assert.NoError(t, err)
- defer func() {
- _ = r.Body.Close()
- }()
-
- assert.NoError(t, err)
- assert.Equal(t, 401, r.StatusCode)
-}
-
-func Test_Service_EnvPath(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6029",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6002"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 3000)
- defer c.Stop()
-
- req, err := http.NewRequest("GET", "http://localhost:6029/", nil)
- assert.NoError(t, err)
-
- r, err := http.DefaultClient.Do(req)
- assert.NoError(t, err)
- if err != nil {
- panic(err)
- }
- defer func() {
- _ = r.Body.Close()
- }()
-
- b, _ := ioutil.ReadAll(r.Body)
-
- assert.NoError(t, err)
- assert.Equal(t, 200, r.StatusCode)
- assert.Equal(t, []byte("/ws"), b)
-}
-
-func Test_Service_Disabled(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- _, s := c.Get(ID)
- assert.Equal(t, service.StatusInactive, s)
-}
-
-func Test_Service_JoinTopic(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6038",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6003"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- u := url.URL{Scheme: "ws", Host: "localhost:6038", Path: "/ws"}
-
- conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.NoError(t, err)
- defer func() {
- _ = conn.Close()
- }()
-
- read := make(chan interface{})
-
- go func() {
- defer close(read)
- for {
- _, message, err := conn.ReadMessage()
- if err != nil {
- return
- }
- read <- message
- }
- }()
-
- err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
- assert.NoError(t, err)
-
- assert.Equal(t, `{"topic":"@join","payload":["topic"]}`, readStr(<-read))
-}
-
-func Test_Service_DenyJoin(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6037",
- "workers":{"command": "php tests/worker-deny.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6004"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- u := url.URL{Scheme: "ws", Host: "localhost:6037", Path: "/ws"}
-
- conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.NoError(t, err)
- defer func() {
- _ = conn.Close()
- }()
-
- read := make(chan interface{})
-
- go func() {
- defer close(read)
- for {
- _, message, err := conn.ReadMessage()
- if err != nil {
- read <- err
- continue
- }
- read <- message
- }
- }()
-
- err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
- assert.NoError(t, err)
-
- assert.Equal(t, `{"topic":"#join","payload":["topic"]}`, readStr(<-read))
-}
-
-func Test_Service_DenyJoinServer(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6037",
- "workers":{"command": "php tests/worker-stop.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6005"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- u := url.URL{Scheme: "ws", Host: "localhost:6037", Path: "/ws"}
-
- _, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.Error(t, err)
-}
-
-func Test_Service_EmptyTopics(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6036",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6006"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- u := url.URL{Scheme: "ws", Host: "localhost:6036", Path: "/ws"}
-
- conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.NoError(t, err)
- defer func() {
- _ = conn.Close()
- }()
-
- read := make(chan interface{})
-
- go func() {
- defer close(read)
- for {
- _, message, err := conn.ReadMessage()
- if err != nil {
- read <- err
- continue
- }
- read <- message
- }
- }()
-
- assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":[]}`)))
-
- assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["a"]}`)))
- assert.Equal(t, `{"topic":"@join","payload":["a"]}`, readStr(<-read))
-
- assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"leave", "payload":[]}`)))
-
- assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"leave", "payload":["a"]}`)))
- assert.Equal(t, `{"topic":"@leave","payload":["a"]}`, readStr(<-read))
-
- // must be automatically closed during service stop
- assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["a"]}`)))
- assert.Equal(t, `{"topic":"@join","payload":["a"]}`, readStr(<-read))
-}
-
-func Test_Service_BadTopics(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6035",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6007"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- u := url.URL{Scheme: "ws", Host: "localhost:6035", Path: "/ws"}
-
- conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.NoError(t, err)
- defer func() {
- _ = conn.Close()
- }()
-
- read := make(chan interface{})
-
- go func() {
- defer close(read)
- for {
- _, message, err := conn.ReadMessage()
- if err != nil {
- read <- err
- continue
- }
- read <- message
- }
- }()
-
- assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":"hello"}`)))
- assert.Error(t, (<-read).(error))
-}
-
-func Test_Service_BadTopicsLeave(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6034",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6008"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- u := url.URL{Scheme: "ws", Host: "localhost:6034", Path: "/ws"}
-
- conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.NoError(t, err)
- defer func() {
- _ = conn.Close()
- }()
-
- read := make(chan interface{})
-
- go func() {
- defer close(read)
- for {
- _, message, err := conn.ReadMessage()
- if err != nil {
- read <- err
- continue
- }
- read <- message
- }
- }()
-
- assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"leave", "payload":"hello"}`)))
- assert.Error(t, (<-read).(error))
-}
-
-func Test_Service_Events(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6033",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6009"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- b, _ := c.Get(ID)
- br := b.(*Service)
-
- done := make(chan interface{})
- br.AddListener(func(event int, ctx interface{}) {
- if event == EventConnect {
- close(done)
- }
- })
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- u := url.URL{Scheme: "ws", Host: "localhost:6033", Path: "/ws"}
-
- conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.NoError(t, err)
- defer func() {
- _ = conn.Close()
- }()
-
- <-done
-
- read := make(chan interface{})
-
- go func() {
- defer close(read)
- for {
- _, message, err := conn.ReadMessage()
- if err != nil {
- return
- }
- read <- message
- }
- }()
-
- err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
- assert.NoError(t, err)
-
- assert.Equal(t, `{"topic":"@join","payload":["topic"]}`, readStr(<-read))
-}
-
-func Test_Service_Warmup(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6033",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6009"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- rp, _ := c.Get(rpc.ID)
-
- b, _ := c.Get(ID)
- br := b.(*Service)
-
- done := make(chan interface{})
- br.AddListener(func(event int, ctx interface{}) {
- if event == EventConnect {
- close(done)
- }
- })
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- client, err := rp.(*rpc.Service).Client()
- assert.NoError(t, err)
-
- var ok bool
- assert.NoError(t, client.Call("ws.SubscribePattern", "test", &ok))
- assert.True(t, ok)
- assert.NoError(t, client.Call("ws.Subscribe", "test", &ok))
- assert.True(t, ok)
-
- u := url.URL{Scheme: "ws", Host: "localhost:6033", Path: "/ws"}
-
- conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.NoError(t, err)
- defer func() {
- _ = conn.Close()
- }()
-
- <-done
-
- read := make(chan interface{})
-
- go func() {
- defer close(read)
- for {
- _, message, err := conn.ReadMessage()
- if err != nil {
- return
- }
- read <- message
- }
- }()
-
- // not delivered
- assert.NoError(t, br.client.Publish(&broadcast.Message{Topic: "topic", Payload: []byte(`"hello"`)}))
-
- err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
- assert.NoError(t, err)
-
- assert.Equal(t, `{"topic":"@join","payload":["topic"]}`, readStr(<-read))
-
- assert.NoError(t, br.client.Publish(&broadcast.Message{Topic: "topic", Payload: []byte(`"hello"`)}))
- assert.Equal(t, `{"topic":"topic","payload":"hello"}`, readStr(<-read))
-}
-
-func Test_Service_Stop(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, &env.Service{})
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(rrhttp.ID, &rrhttp.Service{})
- c.Register(broadcast.ID, &broadcast.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- http: `{
- "address": ":6033",
- "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
- }`,
- rpc: `{"listen":"tcp://127.0.0.1:6009"}`,
- ws: `{"path":"/ws"}`,
- broadcast: `{}`,
- }))
-
- rp, _ := c.Get(rpc.ID)
-
- b, _ := c.Get(ID)
- br := b.(*Service)
-
- done := make(chan interface{})
- br.AddListener(func(event int, ctx interface{}) {
- if event == EventConnect {
- close(done)
- }
- })
-
- go func() { _ = c.Serve() }()
- time.Sleep(time.Millisecond * 1000)
- defer c.Stop()
-
- client, err := rp.(*rpc.Service).Client()
- assert.NoError(t, err)
-
- var ok bool
- assert.NoError(t, client.Call("ws.SubscribePattern", "test", &ok))
- assert.True(t, ok)
- assert.NoError(t, client.Call("ws.Subscribe", "test", &ok))
- assert.True(t, ok)
-
- u := url.URL{Scheme: "ws", Host: "localhost:6033", Path: "/ws"}
-
- conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
- assert.NoError(t, err)
- defer func() {
- _ = conn.Close()
- }()
-
- <-done
-
- read := make(chan interface{})
-
- go func() {
- defer close(read)
- for {
- _, message, err := conn.ReadMessage()
- if err != nil {
- return
- }
- read <- message
- }
- }()
-
- // not delivered
- assert.NoError(t, br.client.Publish(&broadcast.Message{Topic: "topic", Payload: []byte(`"hello"`)}))
-
- br.Stop()
-
- err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
- assert.NoError(t, err)
-}
diff --git a/plugins/http/serve.go b/plugins/http/serve.go
index 338d4339..26fccf79 100644
--- a/plugins/http/serve.go
+++ b/plugins/http/serve.go
@@ -21,7 +21,7 @@ func (s *Plugin) serveHTTP(errCh chan error) {
if s.http == nil {
return
}
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serveHTTP")
if len(s.mdwr) > 0 {
applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log)
@@ -43,7 +43,7 @@ func (s *Plugin) serveHTTPS(errCh chan error) {
if s.https == nil {
return
}
- const op = errors.Op("http_plugin_serve_https")
+ const op = errors.Op("serveHTTPS")
if len(s.mdwr) > 0 {
applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
}
@@ -70,7 +70,7 @@ func (s *Plugin) serveFCGI(errCh chan error) {
if s.fcgi == nil {
return
}
- const op = errors.Op("http_plugin_serve_fcgi")
+ const op = errors.Op("serveFCGI")
if len(s.mdwr) > 0 {
applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
diff --git a/plugins/http/static/etag.go b/plugins/http/static/etag.go
index 70673337..c457b95e 100644
--- a/plugins/http/static/etag.go
+++ b/plugins/http/static/etag.go
@@ -5,9 +5,9 @@ import (
"io"
"net/http"
"os"
- "unsafe"
httpConfig "github.com/spiral/roadrunner/v2/plugins/http/config"
+ "github.com/spiral/roadrunner/v2/utils"
)
const etag string = "Etag"
@@ -44,7 +44,7 @@ func SetEtag(cfg *httpConfig.Static, f *os.File, w http.ResponseWriter) {
calculatedEtag = appendUint(calculatedEtag, crc32.Checksum(body, crc32q))
calculatedEtag = append(calculatedEtag, '"')
- w.Header().Set(etag, byteToSrt(calculatedEtag))
+ w.Header().Set(etag, utils.AsString(calculatedEtag))
}
// appendUint appends n to dst and returns the extended dst.
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 2e2df527..0f647cb1 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
)
@@ -393,7 +394,7 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
- err := b.Delete([]byte(k))
+ err := b.Delete(utils.AsBytes(k))
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 17b06fa0..02281ed5 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -148,7 +149,7 @@ func (d *Driver) Set(items ...kv.Item) error {
memcachedItem := &memcache.Item{
Key: items[i].Key,
// unsafe convert
- Value: []byte(items[i].Value),
+ Value: utils.AsBytes(items[i].Value),
Flags: 0,
}
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index 1e0d03d4..c2494ee7 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -70,7 +71,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return []byte(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(kv.Item).Value), nil
}
return nil, nil
}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 240a28d1..2d4babbe 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -1,11 +1,10 @@
package kv
import (
- "unsafe"
-
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
// Wrapper for the plugin
@@ -31,10 +30,10 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error {
if !dataRoot.Items(tmpItem, i) {
continue
}
- keys = append(keys, strConvert(tmpItem.Key()))
+ keys = append(keys, utils.AsString(tmpItem.Key()))
}
- if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok {
+ if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok {
ret, err := st.Has(keys...)
if err != nil {
return err
@@ -73,7 +72,7 @@ func (r *rpc) Set(in []byte, ok *bool) error {
items = append(items, itc)
}
- if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
err := st.Set(items...)
if err != nil {
return err
@@ -104,7 +103,7 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
keys = append(keys, string(tmpItem.Key()))
}
- if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
ret, err := st.MGet(keys...)
if err != nil {
return err
@@ -143,7 +142,7 @@ func (r *rpc) MExpire(in []byte, ok *bool) error {
items = append(items, itc)
}
- if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
err := st.MExpire(items...)
if err != nil {
return errors.E(op, err)
@@ -173,7 +172,7 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
keys = append(keys, string(tmpItem.Key()))
}
- if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
ret, err := st.TTL(keys...)
if err != nil {
return err
@@ -201,7 +200,7 @@ func (r *rpc) Delete(in []byte, ok *bool) error {
}
keys = append(keys, string(tmpItem.Key()))
}
- if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
err := st.Delete(keys...)
if err != nil {
return errors.E(op, err)
@@ -215,4 +214,3 @@ func (r *rpc) Delete(in []byte, ok *bool) error {
*ok = false
return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-
diff --git a/plugins/logger/std_log_adapter.go b/plugins/logger/std_log_adapter.go
index f00a0fd3..479aa565 100644
--- a/plugins/logger/std_log_adapter.go
+++ b/plugins/logger/std_log_adapter.go
@@ -1,7 +1,7 @@
package logger
import (
- "unsafe"
+ "github.com/spiral/roadrunner/v2/utils"
)
// StdLogAdapter can be passed to the http.Server or any place which required standard logger to redirect output
@@ -12,7 +12,7 @@ type StdLogAdapter struct {
// Write io.Writer interface implementation
func (s *StdLogAdapter) Write(p []byte) (n int, err error) {
- s.log.Error("server internal error", "message", toString(p))
+ s.log.Error("server internal error", "message", utils.AsString(p))
return len(p), nil
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 13588b6e..320da372 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -6,7 +6,6 @@ import (
"os"
"os/exec"
"strings"
- "unsafe"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/transport"
@@ -239,10 +238,10 @@ func (server *Plugin) collectEvents(event interface{}) {
case events.EventWorkerError:
server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(toString(we.Payload.([]byte)), " \n\t"))
+ server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
// stderr event is INFO level
case events.EventWorkerStderr:
- server.log.Info(strings.TrimRight(toString(we.Payload.([]byte)), " \n\t"))
+ server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
}
}
}
@@ -253,10 +252,10 @@ func (server *Plugin) collectWorkerLogs(event interface{}) {
case events.EventWorkerError:
server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(toString(we.Payload.([]byte)), " \n\t"))
+ server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
// stderr event is INFO level
case events.EventWorkerStderr:
- server.log.Info(strings.TrimRight(toString(we.Payload.([]byte)), " \n\t"))
+ server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
}
}
}
diff --git a/plugins/service/process.go b/plugins/service/process.go
index 74aa789c..cac5c41e 100644
--- a/plugins/service/process.go
+++ b/plugins/service/process.go
@@ -7,10 +7,10 @@ import (
"sync/atomic"
"syscall"
"time"
- "unsafe"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
// Process structure contains an information about process, restart information, log, errors, etc
@@ -50,7 +50,7 @@ func NewServiceProcess(restartAfterExit bool, execTimeout time.Duration, restart
// write message to the log (stderr)
func (p *Process) Write(b []byte) (int, error) {
- p.log.Info(toString(b))
+ p.log.Info(utils.AsString(b))
return len(b), nil
}
diff --git a/utils/convert.go b/utils/convert.go
new file mode 100644
index 00000000..8d153ce5
--- /dev/null
+++ b/utils/convert.go
@@ -0,0 +1,34 @@
+package utils
+
+import (
+ "reflect"
+ "unsafe"
+)
+
+// AsBytes returns a slice that refers to the data backing the string s.
+func AsBytes(s string) []byte {
+ // get the pointer to the data of the string
+ p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data)
+
+ var b []byte
+ hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ hdr.Data = uintptr(p)
+ // we need to set the cap and len for the string to byte convert
+ // because string is shorter than []bytes
+ hdr.Cap = len(s)
+ hdr.Len = len(s)
+
+ return b
+}
+
+// AsString returns a string that refers to the data backing the slice s.
+func AsString(b []byte) string {
+ p := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data)
+
+ var s string
+ hdr := (*reflect.StringHeader)(unsafe.Pointer(&s))
+ hdr.Data = uintptr(p)
+ hdr.Len = len(b)
+
+ return s
+}