path: root/plugins
diff options
Diffstat (limited to 'plugins')
52 files changed, 2869 insertions, 901 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
new file mode 100644
index 00000000..aa270f64
--- /dev/null
+++ b/plugins/broadcast/config.go
@@ -0,0 +1 @@
+package broadcast
diff --git a/plugins/broadcast/doc/.rr-broadcast.yaml b/plugins/broadcast/doc/.rr-broadcast.yaml
new file mode 100644
index 00000000..a0a2ad5e
--- /dev/null
+++ b/plugins/broadcast/doc/.rr-broadcast.yaml
@@ -0,0 +1,10 @@
+# broadcast service configuration.rr.yaml
+ # path to enable web-socket handler middleware
+ path: /ws
+ # optional, redis broker configuration
+ redis:
+ addr: "localhost:6379"
+ passsword: ""
+ db: 0
diff --git a/plugins/broadcast/doc/broadcast.drawio b/plugins/broadcast/doc/broadcast.drawio
new file mode 100644
index 00000000..f9845dc8
--- /dev/null
+++ b/plugins/broadcast/doc/broadcast.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-03T16:49:17.087Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.128 Electron/12.0.6 Safari/537.36" etag="RZNtN_6682KfuWpR1T35" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">1ZhdU6MwFIZ/TWd2L9xJCVB72a9VZ6y6Vkd7mcIBsqaECcG2/voNEAqIdXSEddpelLzJCcl78pCGHp6st2eCRMGcu8B6BnK3PTztGUYfY6R+UmWXK0Nk5YIvqKsblcKCvoAWdZyfUBfiWkPJOZM0qosOD0NwZE0jQvBNvZnHWf2uEfGhISwcwprqA3VloNWhgcqKc6B+UNzaQLpmTYrWWogD4vJNRcKzHp4IzmV+td5OgKXuFcbkcb8P1O5HJiCUHwnYmfPLu+WYhM4dZk9/BEXh6GSoxyZ3xYzBVQboIhcy4D4PCZuV6ljwJHQh7RWpUtnmkvNIiX0l/gUpdzqbJJFcSYFcM10LWyofK9fLtKtfli5Nt7rnrLDThXyc6eAOTl9LMU+EA+/Mub83Xy1b4GuQYqfiBDAi6XO9f6LXj79vtw+94VTd2UB6rdtFnvVKN01U70IS4YPUUWWe1EVlGKWUZe8TmdQDfiYs0VMY316PppPR4k7J5/fKQfTj5vL+7OLqZyPp9ZRuAiphEZHMxY0Cu54+jzI24YyLLBa7BE49R+mxFPwJKjW2cworb5+8ZxAStu+nr5kWHWDiV/YWT4hNBcsiBUGFSBsdzmQtB5813DoWdJTDYvdYNkyLy2pdGZaVOkDO0I/uDIEO0fxSQo0GQQ+LVknxrPT7JinZJ43goazo+acdgvZ7qiZI7cUfI+j1g6w1w+3jI6jGT4lT9wThYyAINwi6nU0vWobI8wznze3GtVe2ZXcDi2l8NyyD44Olvt30/x8s5jHAYjZgubg6mc/m17fLloEB+wAwg+EKoW6AsezvBua04a+aOlOHxDj1NlmdRCzxaRg3zFYmyLqjhFE/VNeOMgOUeePUKqpOiSNdsaaumxMHMX0hq6yr1Nko/bufzcwa96xp2peCLM5567djPX5lPR580HqjK+uLAVU3ghvVFRrft7sZQN+1YPDW2h7aA0xa2gwaR7sOzx6qWL4SyM+G5ZsVPPsH</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/memory/memory.go b/plugins/broadcast/memory/memory.go
new file mode 100644
index 00000000..5b85d68f
--- /dev/null
+++ b/plugins/broadcast/memory/memory.go
@@ -0,0 +1,131 @@
+package memory
+import (
+ "errors"
+ "sync/atomic"
+// Memory manages broadcasting in memory.
+type Memory struct {
+ router *Router
+ messages chan *Message
+ join, leave chan subscriber
+ stop chan interface{}
+ stopped int32
+// memoryBroker creates new memory based message broker.
+func memoryBroker() *Memory {
+ return &Memory{
+ router: NewRouter(),
+ messages: make(chan *Message),
+ join: make(chan subscriber),
+ leave: make(chan subscriber),
+ stop: make(chan interface{}),
+ stopped: 0,
+ }
+// Serve serves broker.
+func (m *Memory) Serve() error {
+ for {
+ select {
+ case ctx := <-m.join:
+ ctx.done <- m.handleJoin(ctx)
+ case ctx := <-m.leave:
+ ctx.done <- m.handleLeave(ctx)
+ case msg := <-m.messages:
+ m.router.Dispatch(msg)
+ case <-m.stop:
+ return nil
+ }
+ }
+func (m *Memory) handleJoin(sub subscriber) (err error) {
+ if sub.pattern != "" {
+ _, err = m.router.SubscribePattern(sub.upstream, sub.pattern)
+ return err
+ }
+ m.router.Subscribe(sub.upstream, sub.topics...)
+ return nil
+func (m *Memory) handleLeave(sub subscriber) error {
+ if sub.pattern != "" {
+ m.router.UnsubscribePattern(sub.upstream, sub.pattern)
+ return nil
+ }
+ m.router.Unsubscribe(sub.upstream, sub.topics...)
+ return nil
+// Stop closes the consumption and disconnects broker.
+func (m *Memory) Stop() {
+ if atomic.CompareAndSwapInt32(&m.stopped, 0, 1) {
+ close(m.stop)
+ }
+// Subscribe broker to one or multiple channels.
+func (m *Memory) Subscribe(upstream chan *Message, topics ...string) error {
+ if atomic.LoadInt32(&m.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)}
+ m.join <- ctx
+ return <-ctx.done
+// SubscribePattern broker to pattern.
+func (m *Memory) SubscribePattern(upstream chan *Message, pattern string) error {
+ if atomic.LoadInt32(&m.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)}
+ m.join <- ctx
+ return <-ctx.done
+// Unsubscribe broker from one or multiple channels.
+func (m *Memory) Unsubscribe(upstream chan *Message, topics ...string) error {
+ if atomic.LoadInt32(&m.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)}
+ m.leave <- ctx
+ return <-ctx.done
+// UnsubscribePattern broker from pattern.
+func (m *Memory) UnsubscribePattern(upstream chan *Message, pattern string) error {
+ if atomic.LoadInt32(&m.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)}
+ m.leave <- ctx
+ return <-ctx.done
+// Publish one or multiple Channel.
+func (m *Memory) Publish(messages ...*Message) error {
+ if atomic.LoadInt32(&m.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ for _, msg := range messages {
+ m.messages <- msg
+ }
+ return nil
diff --git a/plugins/broadcast/memory/memory_test.go b/plugins/broadcast/memory/memory_test.go
new file mode 100644
index 00000000..0eb8d03e
--- /dev/null
+++ b/plugins/broadcast/memory/memory_test.go
@@ -0,0 +1,80 @@
+package memory
+import (
+ "testing"
+ ""
+func TestMemory_Broadcast(t *testing.T) {
+ br, _, c := setup(`{}`)
+ defer c.Stop()
+ client := br.NewClient()
+ defer client.Close()
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) // must not be delivered
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello2")))
+ assert.Equal(t, `hello2`, readStr(<-client.Channel()))
+ assert.NoError(t, client.Unsubscribe("topic"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello3")))
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello4")))
+ assert.Equal(t, `hello4`, readStr(<-client.Channel()))
+func TestMemory_BroadcastPattern(t *testing.T) {
+ br, _, c := setup(`{}`)
+ defer c.Stop()
+ client := br.NewClient()
+ defer client.Close()
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) // must not be delivered
+ assert.NoError(t, client.SubscribePattern("topic/*"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+ assert.NoError(t, client.Publish(newMessage("topic/1", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/2", "hello2")))
+ assert.Equal(t, `hello2`, readStr(<-client.Channel()))
+ assert.NoError(t, br.Broker().Publish(newMessage("different", "hello4")))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/2", "hello5")))
+ assert.Equal(t, `hello5`, readStr(<-client.Channel()))
+ assert.NoError(t, client.UnsubscribePattern("topic/*"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/3", "hello6")))
+ assert.NoError(t, client.SubscribePattern("topic/*"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/4", "hello7")))
+ assert.Equal(t, `hello7`, readStr(<-client.Channel()))
+func TestMemory_NotActive(t *testing.T) {
+ b := memoryBroker()
+ b.stopped = 1
+ assert.Error(t, b.Publish(nil))
+ assert.Error(t, b.Subscribe(nil))
+ assert.Error(t, b.Unsubscribe(nil))
+ assert.Error(t, b.SubscribePattern(nil, ""))
+ assert.Error(t, b.UnsubscribePattern(nil, ""))
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
new file mode 100644
index 00000000..3cedf555
--- /dev/null
+++ b/plugins/broadcast/plugin.go
@@ -0,0 +1,11 @@
+package broadcast
+type Plugin struct {
+func (p *Plugin) Init() error {
+ return nil
diff --git a/plugins/broadcast/redis/redis.go b/plugins/broadcast/redis/redis.go
new file mode 100644
index 00000000..41f48658
--- /dev/null
+++ b/plugins/broadcast/redis/redis.go
@@ -0,0 +1,172 @@
+package redis
+import (
+ "context"
+ "errors"
+ "sync/atomic"
+ ""
+// Redis based broadcast Router.
+type Redis struct {
+ client redis.UniversalClient
+ psClient redis.UniversalClient
+ router *Router
+ messages chan *Message
+ listen, leave chan subscriber
+ stop chan interface{}
+ stopped int32
+// creates new redis broker
+func redisBroker(cfg *RedisConfig) (*Redis, error) {
+ client := cfg.redisClient()
+ if _, err := client.Ping(context.Background()).Result(); err != nil {
+ return nil, err
+ }
+ psClient := cfg.redisClient()
+ if _, err := psClient.Ping(context.Background()).Result(); err != nil {
+ return nil, err
+ }
+ return &Redis{
+ client: client,
+ psClient: psClient,
+ router: NewRouter(),
+ messages: make(chan *Message),
+ listen: make(chan subscriber),
+ leave: make(chan subscriber),
+ stop: make(chan interface{}),
+ stopped: 0,
+ }, nil
+// Serve serves broker.
+func (r *Redis) Serve() error {
+ pubsub := r.psClient.Subscribe(context.Background())
+ channel := pubsub.Channel()
+ for {
+ select {
+ case ctx := <-r.listen:
+ ctx.done <- r.handleJoin(ctx, pubsub)
+ case ctx := <-r.leave:
+ ctx.done <- r.handleLeave(ctx, pubsub)
+ case msg := <-channel:
+ r.router.Dispatch(&Message{
+ Topic: msg.Channel,
+ Payload: []byte(msg.Payload),
+ })
+ case <-r.stop:
+ return nil
+ }
+ }
+func (r *Redis) handleJoin(sub subscriber, pubsub *redis.PubSub) error {
+ if sub.pattern != "" {
+ newPatterns, err := r.router.SubscribePattern(sub.upstream, sub.pattern)
+ if err != nil || len(newPatterns) == 0 {
+ return err
+ }
+ return pubsub.PSubscribe(context.Background(), newPatterns...)
+ }
+ newTopics := r.router.Subscribe(sub.upstream, sub.topics...)
+ if len(newTopics) == 0 {
+ return nil
+ }
+ return pubsub.Subscribe(context.Background(), newTopics...)
+func (r *Redis) handleLeave(sub subscriber, pubsub *redis.PubSub) error {
+ if sub.pattern != "" {
+ dropPatterns := r.router.UnsubscribePattern(sub.upstream, sub.pattern)
+ if len(dropPatterns) == 0 {
+ return nil
+ }
+ return pubsub.PUnsubscribe(context.Background(), dropPatterns...)
+ }
+ dropTopics := r.router.Unsubscribe(sub.upstream, sub.topics...)
+ if len(dropTopics) == 0 {
+ return nil
+ }
+ return pubsub.Unsubscribe(context.Background(), dropTopics...)
+// Stop closes the consumption and disconnects broker.
+func (r *Redis) Stop() {
+ if atomic.CompareAndSwapInt32(&r.stopped, 0, 1) {
+ close(r.stop)
+ }
+// Subscribe broker to one or multiple channels.
+func (r *Redis) Subscribe(upstream chan *Message, topics ...string) error {
+ if atomic.LoadInt32(&r.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)}
+ r.listen <- ctx
+ return <-ctx.done
+// SubscribePattern broker to pattern.
+func (r *Redis) SubscribePattern(upstream chan *Message, pattern string) error {
+ if atomic.LoadInt32(&r.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)}
+ r.listen <- ctx
+ return <-ctx.done
+// Unsubscribe broker from one or multiple channels.
+func (r *Redis) Unsubscribe(upstream chan *Message, topics ...string) error {
+ if atomic.LoadInt32(&r.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)}
+ r.leave <- ctx
+ return <-ctx.done
+// UnsubscribePattern broker from pattern.
+func (r *Redis) UnsubscribePattern(upstream chan *Message, pattern string) error {
+ if atomic.LoadInt32(&r.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)}
+ r.leave <- ctx
+ return <-ctx.done
+// Publish one or multiple Channel.
+func (r *Redis) Publish(messages ...*Message) error {
+ if atomic.LoadInt32(&r.stopped) == 1 {
+ return errors.New("broker has been stopped")
+ }
+ for _, msg := range messages {
+ if err := r.client.Publish(context.Background(), msg.Topic, []byte(msg.Payload)).Err(); err != nil {
+ return err
+ }
+ }
+ return nil
diff --git a/plugins/broadcast/redis/redis_test.go b/plugins/broadcast/redis/redis_test.go
new file mode 100644
index 00000000..37027e01
--- /dev/null
+++ b/plugins/broadcast/redis/redis_test.go
@@ -0,0 +1,98 @@
+package redis
+import (
+ "fmt"
+ "testing"
+ ""
+ ""
+ ""
+func TestRedis_Error(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ //c := service.NewContainer(logger)
+ //c.Register(rpc.ID, &rpc.Service{})
+ //c.Register(ID, &Service{})
+ //
+ //err := c.Init(&testCfg{
+ // broadcast: `{"redis":{"addr":"localhost:6372"}}`,
+ // rpc: fmt.Sprintf(`{"join":"tcp://:%v"}`, rpcPort),
+ //})
+ rpcPort++
+ assert.Error(t, err)
+func TestRedis_Broadcast(t *testing.T) {
+ br, _, c := setup(`{"redis":{"addr":"localhost:6379"}}`)
+ defer c.Stop()
+ client := br.NewClient()
+ defer client.Close()
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) // must not be delivered
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello2")))
+ assert.Equal(t, `hello2`, readStr(<-client.Channel()))
+ assert.NoError(t, client.Unsubscribe("topic"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello3")))
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello4")))
+ assert.Equal(t, `hello4`, readStr(<-client.Channel()))
+func TestRedis_BroadcastPattern(t *testing.T) {
+ br, _, c := setup(`{"redis":{"addr":"localhost:6379"}}`)
+ defer c.Stop()
+ client := br.NewClient()
+ defer client.Close()
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) // must not be delivered
+ assert.NoError(t, client.SubscribePattern("topic/*"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/2", "hello2")))
+ assert.Equal(t, `hello2`, readStr(<-client.Channel()))
+ assert.NoError(t, br.Broker().Publish(newMessage("different", "hello4")))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/2", "hello5")))
+ assert.Equal(t, `hello5`, readStr(<-client.Channel()))
+ assert.NoError(t, client.UnsubscribePattern("topic/*"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/3", "hello6")))
+ assert.NoError(t, client.SubscribePattern("topic/*"))
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/4", "hello7")))
+ assert.Equal(t, `hello7`, readStr(<-client.Channel()))
+func TestRedis_NotActive(t *testing.T) {
+ b := &Redis{}
+ b.stopped = 1
+ assert.Error(t, b.Publish(nil))
+ assert.Error(t, b.Subscribe(nil))
+ assert.Error(t, b.Unsubscribe(nil))
+ assert.Error(t, b.SubscribePattern(nil, ""))
+ assert.Error(t, b.UnsubscribePattern(nil, ""))
diff --git a/plugins/broadcast/root/Makefile b/plugins/broadcast/root/Makefile
new file mode 100644
index 00000000..d88312d2
--- /dev/null
+++ b/plugins/broadcast/root/Makefile
@@ -0,0 +1,9 @@
+ rm -rf rr-jobbroadcast
+install: all
+ cp rr-broadcast /usr/local/bin/rr-broadcast
+ rm -f /usr/local/bin/rr-broadcast
+ composer update
+ go test -v -race -cover
diff --git a/plugins/broadcast/root/broker.go b/plugins/broadcast/root/broker.go
new file mode 100644
index 00000000..923c8105
--- /dev/null
+++ b/plugins/broadcast/root/broker.go
@@ -0,0 +1,36 @@
+package broadcast
+import "encoding/json"
+// Broker defines the ability to operate as message passing broker.
+type Broker interface {
+ // Serve serves broker.
+ Serve() error
+ // Stop closes the consumption and disconnects broker.
+ Stop()
+ // Subscribe broker to one or multiple topics.
+ Subscribe(upstream chan *Message, topics ...string) error
+ // SubscribePattern broker to pattern.
+ SubscribePattern(upstream chan *Message, pattern string) error
+ // Unsubscribe broker from one or multiple topics.
+ Unsubscribe(upstream chan *Message, topics ...string) error
+ // UnsubscribePattern broker from pattern.
+ UnsubscribePattern(upstream chan *Message, pattern string) error
+ // Publish one or multiple Channel.
+ Publish(messages ...*Message) error
+// Message represent single message.
+type Message struct {
+ // Topic message been pushed into.
+ Topic string `json:"topic"`
+ // Payload to be broadcasted. Must be valid json when transferred over RPC.
+ Payload json.RawMessage `json:"payload"`
diff --git a/plugins/broadcast/root/client.go b/plugins/broadcast/root/client.go
new file mode 100644
index 00000000..c5761f94
--- /dev/null
+++ b/plugins/broadcast/root/client.go
@@ -0,0 +1,133 @@
+package broadcast
+import "sync"
+// Client subscribes to a given topic and consumes or publish messages to it.
+type Client struct {
+ upstream chan *Message
+ broker Broker
+ mu sync.Mutex
+ topics []string
+ patterns []string
+// Channel returns incoming messages channel.
+func (c *Client) Channel() chan *Message {
+ return c.upstream
+// Publish message into associated topic or topics.
+func (c *Client) Publish(msg ...*Message) error {
+ return
+// Subscribe client to specific topics.
+func (c *Client) Subscribe(topics ...string) error {
+ defer
+ newTopics := make([]string, 0)
+ for _, topic := range topics {
+ found := false
+ for _, e := range c.topics {
+ if e == topic {
+ found = true
+ break
+ }
+ }
+ if !found {
+ newTopics = append(newTopics, topic)
+ }
+ }
+ if len(newTopics) == 0 {
+ return nil
+ }
+ c.topics = append(c.topics, newTopics...)
+ return, newTopics...)
+// SubscribePattern subscribe client to the specific topic pattern.
+func (c *Client) SubscribePattern(pattern string) error {
+ defer
+ for _, g := range c.patterns {
+ if g == pattern {
+ return nil
+ }
+ }
+ c.patterns = append(c.patterns, pattern)
+ return, pattern)
+// Unsubscribe client from specific topics
+func (c *Client) Unsubscribe(topics ...string) error {
+ defer
+ dropTopics := make([]string, 0)
+ for _, topic := range topics {
+ for i, e := range c.topics {
+ if e == topic {
+ c.topics = append(c.topics[:i], c.topics[i+1:]...)
+ dropTopics = append(dropTopics, topic)
+ }
+ }
+ }
+ if len(dropTopics) == 0 {
+ return nil
+ }
+ return, dropTopics...)
+// UnsubscribePattern client from topic pattern.
+func (c *Client) UnsubscribePattern(pattern string) error {
+ defer
+ for i := range c.patterns {
+ if c.patterns[i] == pattern {
+ c.patterns = append(c.patterns[:i], c.patterns[i+1:]...)
+ return, pattern)
+ }
+ }
+ return nil
+// Topics return all the topics client subscribed to.
+func (c *Client) Topics() []string {
+ defer
+ return c.topics
+// Patterns return all the patterns client subscribed to.
+func (c *Client) Patterns() []string {
+ defer
+ return c.patterns
+// Close the client and consumption.
+func (c *Client) Close() (err error) {
+ defer
+ if len(c.topics) != 0 {
+ err =, c.topics...)
+ }
+ close(c.upstream)
+ return err
diff --git a/plugins/broadcast/root/client_test.go b/plugins/broadcast/root/client_test.go
new file mode 100644
index 00000000..52a50d57
--- /dev/null
+++ b/plugins/broadcast/root/client_test.go
@@ -0,0 +1,59 @@
+package broadcast
+import (
+ "testing"
+ ""
+func Test_Client_Topics(t *testing.T) {
+ br, _, c := setup(`{}`)
+ defer c.Stop()
+ client := br.NewClient()
+ defer client.Close()
+ assert.Equal(t, []string{}, client.Topics())
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.Equal(t, []string{"topic"}, client.Topics())
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.Equal(t, []string{"topic"}, client.Topics())
+ assert.NoError(t,, "topic"))
+ assert.Equal(t, []string{"topic"}, client.Topics())
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+ assert.NoError(t, client.Unsubscribe("topic"))
+ assert.NoError(t, client.Unsubscribe("topic"))
+ assert.NoError(t,, "topic"))
+ assert.Equal(t, []string{}, client.Topics())
+func Test_Client_Patterns(t *testing.T) {
+ br, _, c := setup(`{}`)
+ defer c.Stop()
+ client := br.NewClient()
+ defer client.Close()
+ assert.Equal(t, []string{}, client.Patterns())
+ assert.NoError(t, client.SubscribePattern("topic/*"))
+ assert.Equal(t, []string{"topic/*"}, client.Patterns())
+ assert.NoError(t,, "topic/*"))
+ assert.Equal(t, []string{"topic/*"}, client.Patterns())
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+ assert.NoError(t, client.UnsubscribePattern("topic/*"))
+ assert.NoError(t,, "topic/*"))
+ assert.Equal(t, []string{}, client.Patterns())
diff --git a/plugins/broadcast/root/config.go b/plugins/broadcast/root/config.go
new file mode 100644
index 00000000..8c732441
--- /dev/null
+++ b/plugins/broadcast/root/config.go
@@ -0,0 +1,61 @@
+package broadcast
+import (
+ "errors"
+ ""
+// Config configures the broadcast extension.
+type Config struct {
+ // RedisConfig configures redis broker.
+ Redis *RedisConfig
+// Hydrate reads the configuration values from the source configuration.
+//func (c *Config) Hydrate(cfg service.Config) error {
+// if err := cfg.Unmarshal(c); err != nil {
+// return err
+// }
+// if c.Redis != nil {
+// return c.Redis.isValid()
+// }
+// return nil
+// InitDefaults enables in memory broadcast configuration.
+func (c *Config) InitDefaults() error {
+ return nil
+// RedisConfig configures redis broker.
+type RedisConfig struct {
+ // Addr of the redis server.
+ Addr string
+ // Password to redis server.
+ Password string
+ // DB index.
+ DB int
+// clusterOptions
+func (cfg *RedisConfig) redisClient() redis.UniversalClient {
+ return redis.NewClient(&redis.Options{
+ Addr: cfg.Addr,
+ Password: cfg.Password,
+ PoolSize: 2,
+ })
+// check if redis config is valid.
+func (cfg *RedisConfig) isValid() error {
+ if cfg.Addr == "" {
+ return errors.New("redis addr is required")
+ }
+ return nil
diff --git a/plugins/broadcast/root/config_test.go b/plugins/broadcast/root/config_test.go
new file mode 100644
index 00000000..28191c6b
--- /dev/null
+++ b/plugins/broadcast/root/config_test.go
@@ -0,0 +1,60 @@
+package broadcast
+import (
+ "encoding/json"
+ "testing"
+ ""
+ ""
+ ""
+type testCfg struct {
+ rpc string
+ broadcast string
+ target string
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == ID {
+ return &testCfg{target: cfg.broadcast}
+ }
+ if name == rpc.ID {
+ return &testCfg{target: cfg.rpc}
+ }
+ return nil
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ return json.Unmarshal([]byte(, out)
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &testCfg{target: `{"dead`}
+ c := &Config{}
+ assert.Error(t, c.Hydrate(cfg))
+func Test_Config_Hydrate_OK(t *testing.T) {
+ cfg := &testCfg{target: `{"path":"/path"}`}
+ c := &Config{}
+ assert.NoError(t, c.Hydrate(cfg))
+func Test_Config_Redis_Error(t *testing.T) {
+ cfg := &testCfg{target: `{"path":"/path","redis":{}}`}
+ c := &Config{}
+ assert.Error(t, c.Hydrate(cfg))
+func Test_Config_Redis_OK(t *testing.T) {
+ cfg := &testCfg{target: `{"path":"/path","redis":{"addr":"localhost:6379"}}`}
+ c := &Config{}
+ assert.NoError(t, c.Hydrate(cfg))
diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go
new file mode 100644
index 00000000..91137f8b
--- /dev/null
+++ b/plugins/broadcast/root/router.go
@@ -0,0 +1,170 @@
+package broadcast
+//import ""
+// Router performs internal message routing to multiple subscribers.
+type Router struct {
+ wildcard map[string]wildcard
+ routes map[string][]chan *Message
+// wildcard handles number of topics via glob pattern.
+type wildcard struct {
+ //glob glob.Glob
+ upstream []chan *Message
+// helper for blocking join/leave flow
+type subscriber struct {
+ upstream chan *Message
+ done chan error
+ topics []string
+ pattern string
+// NewRouter creates new topic and pattern router.
+func NewRouter() *Router {
+ return &Router{
+ wildcard: make(map[string]wildcard),
+ routes: make(map[string][]chan *Message),
+ }
+// Dispatch to all connected topics.
+func (r *Router) Dispatch(msg *Message) {
+ for _, w := range r.wildcard {
+ if w.glob.Match(msg.Topic) {
+ for _, upstream := range w.upstream {
+ upstream <- msg
+ }
+ }
+ }
+ if routes, ok := r.routes[msg.Topic]; ok {
+ for _, upstream := range routes {
+ upstream <- msg
+ }
+ }
+// Subscribe to topic and return list of newly assigned topics.
+func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) {
+ newTopics = make([]string, 0)
+ for _, topic := range topics {
+ if _, ok := r.routes[topic]; !ok {
+ r.routes[topic] = []chan *Message{upstream}
+ if !r.collapsed(topic) {
+ newTopics = append(newTopics, topic)
+ }
+ continue
+ }
+ joined := false
+ for _, up := range r.routes[topic] {
+ if up == upstream {
+ joined = true
+ break
+ }
+ }
+ if !joined {
+ r.routes[topic] = append(r.routes[topic], upstream)
+ }
+ }
+ return newTopics
+// Unsubscribe from given list of topics and return list of topics which are no longer claimed.
+func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) {
+ dropTopics = make([]string, 0)
+ for _, topic := range topics {
+ if _, ok := r.routes[topic]; !ok {
+ // no such topic, ignore
+ continue
+ }
+ for i := range r.routes[topic] {
+ if r.routes[topic][i] == upstream {
+ r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...)
+ break
+ }
+ }
+ if len(r.routes[topic]) == 0 {
+ delete(r.routes, topic)
+ // standalone empty subscription
+ if !r.collapsed(topic) {
+ dropTopics = append(dropTopics, topic)
+ }
+ }
+ }
+ return dropTopics
+// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in
+// case if blob is invalid.
+func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) {
+ if w, ok := r.wildcard[pattern]; ok {
+ joined := false
+ for _, up := range w.upstream {
+ if up == upstream {
+ joined = true
+ break
+ }
+ }
+ if !joined {
+ w.upstream = append(w.upstream, upstream)
+ }
+ return nil, nil
+ }
+ g, err := glob.Compile(pattern)
+ if err != nil {
+ return nil, err
+ }
+ r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}}
+ return []string{pattern}, nil
+// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed.
+func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) {
+ // todo: store and return collapsed topics
+ w, ok := r.wildcard[pattern]
+ if !ok {
+ // no such pattern
+ return nil
+ }
+ for i, up := range w.upstream {
+ if up == upstream {
+ w.upstream[i] = w.upstream[len(w.upstream)-1]
+ w.upstream[len(w.upstream)-1] = nil
+ w.upstream = w.upstream[:len(w.upstream)-1]
+ if len(w.upstream) == 0 {
+ delete(r.wildcard, pattern)
+ return []string{pattern}
+ }
+ }
+ }
+ return nil
+func (r *Router) collapsed(topic string) bool {
+ for _, w := range r.wildcard {
+ if w.glob.Match(topic) {
+ return true
+ }
+ }
+ return false
diff --git a/plugins/broadcast/root/rpc.go b/plugins/broadcast/root/rpc.go
new file mode 100644
index 00000000..5604a574
--- /dev/null
+++ b/plugins/broadcast/root/rpc.go
@@ -0,0 +1,25 @@
+package broadcast
+import ""
+type rpcService struct {
+ svc *Service
+// Publish Messages.
+func (r *rpcService) Publish(msg []*Message, ok *bool) error {
+ *ok = true
+ return r.svc.Publish(msg...)
+// Publish Messages in async mode. Blocks until get an err or nil from publish
+func (r *rpcService) PublishAsync(msg []*Message, ok *bool) error {
+ *ok = true
+ g := &errgroup.Group{}
+ g.Go(func() error {
+ return r.svc.Publish(msg...)
+ })
+ return g.Wait()
diff --git a/plugins/broadcast/root/rpc_test.go b/plugins/broadcast/root/rpc_test.go
new file mode 100644
index 00000000..157c4e70
--- /dev/null
+++ b/plugins/broadcast/root/rpc_test.go
@@ -0,0 +1,72 @@
+package broadcast
+import (
+ "testing"
+ ""
+func TestRPC_Broadcast(t *testing.T) {
+ br, rpc, c := setup(`{}`)
+ defer c.Stop()
+ client := br.NewClient()
+ defer client.Close()
+ rcpClient, err := rpc.Client()
+ assert.NoError(t, err)
+ // must not be delivered
+ ok := false
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello1"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello1"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.Equal(t, `"hello1"`, readStr(<-client.Channel()))
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello2"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.Equal(t, `"hello2"`, readStr(<-client.Channel()))
+ assert.NoError(t, client.Unsubscribe("topic"))
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello3"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello4"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.Equal(t, `"hello4"`, readStr(<-client.Channel()))
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.PublishAsync",
+ []*Message{newMessage("topic", `"hello5"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.Equal(t, `"hello5"`, readStr(<-client.Channel()))
diff --git a/plugins/broadcast/root/service.go b/plugins/broadcast/root/service.go
new file mode 100644
index 00000000..8b175b3e
--- /dev/null
+++ b/plugins/broadcast/root/service.go
@@ -0,0 +1,85 @@
+package broadcast
+import (
+ "errors"
+ "sync"
+ ""
+// ID defines public service name.
+const ID = "broadcast"
+// Service manages even broadcasting and websocket interface.
+type Service struct {
+ // service and broker configuration
+ cfg *Config
+ // broker
+ mu sync.Mutex
+ broker Broker
+// Init service.
+func (s *Service) Init(cfg *Config, rpc *rpc.Service) (ok bool, err error) {
+ s.cfg = cfg
+ if rpc != nil {
+ if err := rpc.Register(ID, &rpcService{svc: s}); err != nil {
+ return false, err
+ }
+ }
+ if s.cfg.Redis != nil {
+ if, err = redisBroker(s.cfg.Redis); err != nil {
+ return false, err
+ }
+ } else {
+ = memoryBroker()
+ }
+ return true, nil
+// Serve broadcast broker.
+func (s *Service) Serve() (err error) {
+ return
+// Stop closes broadcast broker.
+func (s *Service) Stop() {
+ broker := s.Broker()
+ if broker != nil {
+ broker.Stop()
+ }
+// Broker returns associated broker.
+func (s *Service) Broker() Broker {
+ defer
+ return
+// NewClient returns single connected client with ability to consume or produce into associated topic(svc).
+func (s *Service) NewClient() *Client {
+ return &Client{
+ upstream: make(chan *Message),
+ broker: s.Broker(),
+ topics: make([]string, 0),
+ patterns: make([]string, 0),
+ }
+// Publish one or multiple Channel.
+func (s *Service) Publish(msg ...*Message) error {
+ broker := s.Broker()
+ if broker == nil {
+ return errors.New("no stopped broker")
+ }
+ return s.Broker().Publish(msg...)
diff --git a/plugins/broadcast/root/service_test.go b/plugins/broadcast/root/service_test.go
new file mode 100644
index 00000000..10b924cc
--- /dev/null
+++ b/plugins/broadcast/root/service_test.go
@@ -0,0 +1,65 @@
+package broadcast
+import (
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+ ""
+ ""
+ ""
+ ""
+ ""
+var rpcPort = 6010
+func setup(cfg string) (*Service, *rpc.Service, service.Container) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+ err := c.Init(&testCfg{
+ broadcast: cfg,
+ rpc: fmt.Sprintf(`{"listen":"tcp://:%v"}`, rpcPort),
+ })
+ rpcPort++
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ err = c.Serve()
+ if err != nil {
+ panic(err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 100)
+ b, _ := c.Get(ID)
+ br := b.(*Service)
+ r, _ := c.Get(rpc.ID)
+ rp := r.(*rpc.Service)
+ return br, rp, c
+func readStr(m *Message) string {
+ return strings.TrimRight(string(m.Payload), "\n")
+func newMessage(t, m string) *Message {
+ return &Message{Topic: t, Payload: []byte(m)}
+func TestService_Publish(t *testing.T) {
+ svc := &Service{}
+ assert.Error(t, svc.Publish(nil))
diff --git a/plugins/broadcast/root/tests/.rr.yaml b/plugins/broadcast/root/tests/.rr.yaml
new file mode 100644
index 00000000..c35a12fc
--- /dev/null
+++ b/plugins/broadcast/root/tests/.rr.yaml
@@ -0,0 +1,2 @@
+ redis.addr: "localhost:6379" \ No newline at end of file
diff --git a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php
new file mode 100644
index 00000000..d6014bf0
--- /dev/null
+++ b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php
@@ -0,0 +1,56 @@
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+namespace Spiral\Broadcast\Tests;
+use PHPUnit\Framework\TestCase;
+use Spiral\Broadcast\Broadcast;
+use Spiral\Broadcast\Exception\BroadcastException;
+use Spiral\Broadcast\Message;
+use Spiral\Goridge\RPC;
+use Spiral\Goridge\SocketRelay;
+class BroadcastTest extends TestCase
+ public function testBroadcast(): void
+ {
+ $rpc = new RPC(new SocketRelay('localhost', 6001));
+ $br = new Broadcast($rpc);
+ $br->publish(
+ new Message('tests/topic', 'hello'),
+ new Message('tests/123', ['key' => 'value'])
+ );
+ while (filesize(__DIR__ . '/../log.txt') < 40) {
+ clearstatcache(true, __DIR__ . '/../log.txt');
+ usleep(1000);
+ }
+ clearstatcache(true, __DIR__ . '/../log.txt');
+ $content = file_get_contents(__DIR__ . '/../log.txt');
+ $this->assertSame('tests/topic: "hello"
+tests/123: {"key":"value"}
+', $content);
+ }
+ public function testBroadcastException(): void
+ {
+ $rpc = new RPC(new SocketRelay('localhost', 6002));
+ $br = new Broadcast($rpc);
+ $this->expectException(BroadcastException::class);
+ $br->publish(
+ new Message('topic', 'hello')
+ );
+ }
diff --git a/plugins/broadcast/root/tests/Broadcast/MessageTest.php b/plugins/broadcast/root/tests/Broadcast/MessageTest.php
new file mode 100644
index 00000000..dd9e1cc3
--- /dev/null
+++ b/plugins/broadcast/root/tests/Broadcast/MessageTest.php
@@ -0,0 +1,24 @@
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+namespace Spiral\Broadcast\Tests;
+use PHPUnit\Framework\TestCase;
+use Spiral\Broadcast\Message;
+class MessageTest extends TestCase
+ public function testSerialize(): void
+ {
+ $m = new Message('topic', ['hello' => 'world']);
+ $this->assertSame('{"topic":"topic","payload":{"hello":"world"}}', json_encode($m));
+ }
diff --git a/plugins/broadcast/root/tests/bootstrap.php b/plugins/broadcast/root/tests/bootstrap.php
new file mode 100644
index 00000000..d0dfb88b
--- /dev/null
+++ b/plugins/broadcast/root/tests/bootstrap.php
@@ -0,0 +1,15 @@
+ * Spiral Framework, SpiralScout LLC.
+ *
+ * @author Anton Titov (Wolfy-J)
+ */
+error_reporting(E_ALL | E_STRICT);
+ini_set('display_errors', 'stderr');
+require dirname(__DIR__) . '/vendor_php/autoload.php';
diff --git a/plugins/broadcast/root/tests/docker-compose.yml b/plugins/broadcast/root/tests/docker-compose.yml
new file mode 100644
index 00000000..123aa9b9
--- /dev/null
+++ b/plugins/broadcast/root/tests/docker-compose.yml
@@ -0,0 +1,9 @@
+version: '3'
+ redis:
+ image: 'bitnami/redis:latest'
+ environment:
+ ports:
+ - "6379:6379" \ No newline at end of file
diff --git a/plugins/broadcast/root/tests/go-client.go b/plugins/broadcast/root/tests/go-client.go
new file mode 100644
index 00000000..21442a01
--- /dev/null
+++ b/plugins/broadcast/root/tests/go-client.go
@@ -0,0 +1,78 @@
+package main
+import (
+ "fmt"
+ "os"
+ ""
+ rr ""
+ ""
+ ""
+type logService struct {
+ broadcast *broadcast.Service
+ stop chan interface{}
+func (l *logService) Init(service *broadcast.Service) (bool, error) {
+ l.broadcast = service
+ return true, nil
+func (l *logService) Serve() error {
+ l.stop = make(chan interface{})
+ client := l.broadcast.NewClient()
+ if err := client.SubscribePattern("tests/*"); err != nil {
+ return err
+ }
+ logFile, _ := os.Create("log.txt")
+ g := &errgroup.Group{}
+ g.Go(func() error {
+ for msg := range client.Channel() {
+ _, err := logFile.Write([]byte(fmt.Sprintf(
+ "%s: %s\n",
+ msg.Topic,
+ string(msg.Payload),
+ )))
+ if err != nil {
+ return err
+ }
+ err = logFile.Sync()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ <-l.stop
+ err := logFile.Close()
+ if err != nil {
+ return err
+ }
+ err = client.Close()
+ if err != nil {
+ return err
+ }
+ return g.Wait()
+func (l *logService) Stop() {
+ close(l.stop)
+func main() {
+ rr.Container.Register(rpc.ID, &rpc.Service{})
+ rr.Container.Register(broadcast.ID, &broadcast.Service{})
+ rr.Container.Register("log", &logService{})
+ rr.Execute()
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
new file mode 100644
index 00000000..aa270f64
--- /dev/null
+++ b/plugins/broadcast/rpc.go
@@ -0,0 +1 @@
+package broadcast
diff --git a/plugins/broadcast/websockets/Makefile b/plugins/broadcast/websockets/Makefile
new file mode 100644
index 00000000..f32efbdb
--- /dev/null
+++ b/plugins/broadcast/websockets/Makefile
@@ -0,0 +1,2 @@
+ go test -v -race -cover
diff --git a/plugins/broadcast/websockets/access_validator.go b/plugins/broadcast/websockets/access_validator.go
new file mode 100644
index 00000000..bf27386d
--- /dev/null
+++ b/plugins/broadcast/websockets/access_validator.go
@@ -0,0 +1,102 @@
+package websockets
+import (
+ "bytes"
+ "io"
+ "net/http"
+ "strings"
+ ""
+type accessValidator struct {
+ buffer *bytes.Buffer
+ header http.Header
+ status int
+func newValidator() *accessValidator {
+ return &accessValidator{
+ buffer: bytes.NewBuffer(nil),
+ header: make(http.Header),
+ }
+// copy all content to parent response writer.
+func (w *accessValidator) copy(rw http.ResponseWriter) {
+ rw.WriteHeader(w.status)
+ for k, v := range w.header {
+ for _, vv := range v {
+ rw.Header().Add(k, vv)
+ }
+ }
+ _, _ = io.Copy(rw, w.buffer)
+// Header returns the header map that will be sent by WriteHeader.
+func (w *accessValidator) Header() http.Header {
+ return w.header
+// Write writes the data to the connection as part of an HTTP reply.
+func (w *accessValidator) Write(p []byte) (int, error) {
+ return w.buffer.Write(p)
+// WriteHeader sends an HTTP response header with the provided status code.
+func (w *accessValidator) WriteHeader(statusCode int) {
+ w.status = statusCode
+// IsOK returns true if response contained 200 status code.
+func (w *accessValidator) IsOK() bool {
+ return w.status == 200
+// Body returns response body to rely to user.
+func (w *accessValidator) Body() []byte {
+ return w.buffer.Bytes()
+// Error contains server response.
+func (w *accessValidator) Error() string {
+ return w.buffer.String()
+// assertServerAccess checks if user can join server and returns error and body if user can not. Must return nil in
+// case of error
+func (w *accessValidator) assertServerAccess(f http.HandlerFunc, r *http.Request) error {
+ if err := attributes.Set(r, "ws:joinServer", true); err != nil {
+ return err
+ }
+ defer delete(attributes.All(r), "ws:joinServer")
+ f(w, r)
+ if !w.IsOK() {
+ return w
+ }
+ return nil
+// assertAccess checks if user can access given upstream, the application will receive all user headers and cookies.
+// the decision to authorize user will be based on response code (200).
+func (w *accessValidator) assertTopicsAccess(f http.HandlerFunc, r *http.Request, channels ...string) error {
+ if err := attributes.Set(r, "ws:joinTopics", strings.Join(channels, ",")); err != nil {
+ return err
+ }
+ defer delete(attributes.All(r), "ws:joinTopics")
+ f(w, r)
+ if !w.IsOK() {
+ return w
+ }
+ return nil
diff --git a/plugins/broadcast/websockets/access_validator_test.go b/plugins/broadcast/websockets/access_validator_test.go
new file mode 100644
index 00000000..41372727
--- /dev/null
+++ b/plugins/broadcast/websockets/access_validator_test.go
@@ -0,0 +1,35 @@
+package websockets
+import (
+ "testing"
+ ""
+func TestResponseWrapper_Body(t *testing.T) {
+ w := newValidator()
+ _, _ =w.Write([]byte("hello"))
+ assert.Equal(t, []byte("hello"), w.Body())
+func TestResponseWrapper_Header(t *testing.T) {
+ w := newValidator()
+ w.Header().Set("k", "value")
+ assert.Equal(t, "value", w.Header().Get("k"))
+func TestResponseWrapper_StatusCode(t *testing.T) {
+ w := newValidator()
+ w.WriteHeader(200)
+ assert.True(t, w.IsOK())
+func TestResponseWrapper_StatusCodeBad(t *testing.T) {
+ w := newValidator()
+ w.WriteHeader(400)
+ assert.False(t, w.IsOK())
diff --git a/plugins/broadcast/websockets/config.go b/plugins/broadcast/websockets/config.go
new file mode 100644
index 00000000..8a71c7af
--- /dev/null
+++ b/plugins/broadcast/websockets/config.go
@@ -0,0 +1,21 @@
+package websockets
+// Config defines the websocket service configuration.
+type Config struct {
+ // Path defines on this URL the middleware must be activated. Same path must
+ // be handled by underlying application kernel to authorize the consumption.
+ Path string
+ // NoOrigin disables origin check, only for debug.
+ NoOrigin bool
+// Hydrate reads the configuration values from the source configuration.
+//func (c *Config) Hydrate(cfg service.Config) error {
+// if err := cfg.Unmarshal(c); err != nil {
+// return err
+// }
+// return nil
diff --git a/plugins/broadcast/websockets/config_test.go b/plugins/broadcast/websockets/config_test.go
new file mode 100644
index 00000000..e646fdc4
--- /dev/null
+++ b/plugins/broadcast/websockets/config_test.go
@@ -0,0 +1,34 @@
+package websockets
+import (
+ "encoding/json"
+ "testing"
+ ""
+ ""
+type mockCfg struct{ cfg string }
+func (cfg *mockCfg) Get(name string) service.Config {
+ if name == "same" || name == "jobs" {
+ return cfg
+ }
+ return nil
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{cfg: `{"dead`}
+ c := &Config{}
+ assert.Error(t, c.Hydrate(cfg))
+func Test_Config_Hydrate_OK(t *testing.T) {
+ cfg := &mockCfg{cfg: `{"path":"/path"}`}
+ c := &Config{}
+ assert.NoError(t, c.Hydrate(cfg))
diff --git a/plugins/broadcast/websockets/conn_context.go b/plugins/broadcast/websockets/conn_context.go
new file mode 100644
index 00000000..f7d62833
--- /dev/null
+++ b/plugins/broadcast/websockets/conn_context.go
@@ -0,0 +1,66 @@
+package websockets
+import (
+ "encoding/json"
+ ""
+// ConnContext carries information about websocket connection and it's topics.
+type ConnContext struct {
+ // Conn to the client.
+ Conn *websocket.Conn
+ // Topics contain list of currently subscribed topics.
+ Topics []string
+ // upstream to push messages into.
+ upstream chan *broadcast.Message
+// SendMessage message directly to the client.
+func (ctx *ConnContext) SendMessage(topic string, payload interface{}) (err error) {
+ msg := &broadcast.Message{Topic: topic}
+ msg.Payload, err = json.Marshal(payload)
+ if err == nil {
+ ctx.upstream <- msg
+ }
+ return err
+func (ctx *ConnContext) serve(errHandler func(err error, conn *websocket.Conn)) {
+ for msg := range ctx.upstream {
+ if err := ctx.Conn.WriteJSON(msg); err != nil {
+ errHandler(err, ctx.Conn)
+ }
+ }
+func (ctx *ConnContext) addTopics(topics ...string) {
+ for _, topic := range topics {
+ found := false
+ for _, e := range ctx.Topics {
+ if e == topic {
+ found = true
+ break
+ }
+ }
+ if !found {
+ ctx.Topics = append(ctx.Topics, topic)
+ }
+ }
+func (ctx *ConnContext) dropTopic(topics ...string) {
+ for _, topic := range topics {
+ for i, e := range ctx.Topics {
+ if e == topic {
+ ctx.Topics[i] = ctx.Topics[len(ctx.Topics)-1]
+ ctx.Topics = ctx.Topics[:len(ctx.Topics)-1]
+ }
+ }
+ }
diff --git a/plugins/broadcast/websockets/conn_context_test.go b/plugins/broadcast/websockets/conn_context_test.go
new file mode 100644
index 00000000..466aaa30
--- /dev/null
+++ b/plugins/broadcast/websockets/conn_context_test.go
@@ -0,0 +1,28 @@
+package websockets
+import (
+ "testing"
+ ""
+func TestConnContext_ManageTopics(t *testing.T) {
+ ctx := &ConnContext{Topics: make([]string, 0)}
+ assert.Equal(t, []string{}, ctx.Topics)
+ ctx.addTopics("a", "b")
+ assert.Equal(t, []string{"a", "b"}, ctx.Topics)
+ ctx.addTopics("a", "c")
+ assert.Equal(t, []string{"a", "b", "c"}, ctx.Topics)
+ ctx.dropTopic("b", "c")
+ assert.Equal(t, []string{"a"}, ctx.Topics)
+ ctx.dropTopic("b", "c")
+ assert.Equal(t, []string{"a"}, ctx.Topics)
+ ctx.dropTopic("a")
+ assert.Equal(t, []string{}, ctx.Topics)
diff --git a/plugins/broadcast/websockets/conn_pool.go b/plugins/broadcast/websockets/conn_pool.go
new file mode 100644
index 00000000..80092a44
--- /dev/null
+++ b/plugins/broadcast/websockets/conn_pool.go
@@ -0,0 +1,125 @@
+package websockets
+import (
+ "errors"
+ "sync"
+ ""
+ ""
+// manages a set of websocket connections
+type connPool struct {
+ errHandler func(err error, conn *websocket.Conn)
+ mur sync.Mutex
+ client *broadcast.Client
+ router *broadcast.Router
+ mu sync.Mutex
+ conns map[*websocket.Conn]*ConnContext
+// create new connection pool
+func newPool(client *broadcast.Client, errHandler func(err error, conn *websocket.Conn)) *connPool {
+ cp := &connPool{
+ client: client,
+ router: broadcast.NewRouter(),
+ errHandler: errHandler,
+ conns: map[*websocket.Conn]*ConnContext{},
+ }
+ go func() {
+ for msg := range cp.client.Channel() {
+ cp.mur.Lock()
+ cp.router.Dispatch(msg)
+ cp.mur.Unlock()
+ }
+ }()
+ return cp
+// connect the websocket and register client in message router
+func (cp *connPool) connect(conn *websocket.Conn) (*ConnContext, error) {
+ ctx := &ConnContext{
+ Conn: conn,
+ Topics: []string{},
+ upstream: make(chan *broadcast.Message),
+ }
+ cp.conns[conn] = ctx
+ go ctx.serve(cp.errHandler)
+ return ctx, nil
+// disconnect the websocket
+func (cp *connPool) disconnect(conn *websocket.Conn) error {
+ defer
+ ctx, ok := cp.conns[conn]
+ if !ok {
+ return errors.New("no such connection")
+ }
+ if err := cp.unsubscribe(ctx, ctx.Topics...); err != nil {
+ cp.errHandler(err, conn)
+ }
+ delete(cp.conns, conn)
+ return conn.Close()
+// subscribe the connection
+func (cp *connPool) subscribe(ctx *ConnContext, topics ...string) error {
+ cp.mur.Lock()
+ defer cp.mur.Unlock()
+ ctx.addTopics(topics...)
+ newTopics := cp.router.Subscribe(ctx.upstream, topics...)
+ if len(newTopics) != 0 {
+ return cp.client.Subscribe(newTopics...)
+ }
+ return nil
+// unsubscribe the connection
+func (cp *connPool) unsubscribe(ctx *ConnContext, topics ...string) error {
+ cp.mur.Lock()
+ defer cp.mur.Unlock()
+ ctx.dropTopic(topics...)
+ dropTopics := cp.router.Unsubscribe(ctx.upstream, topics...)
+ if len(dropTopics) != 0 {
+ return cp.client.Unsubscribe(dropTopics...)
+ }
+ return nil
+// close the connection pool and disconnect all listeners
+func (cp *connPool) close() {
+ defer
+ for conn, ctx := range cp.conns {
+ if err := cp.unsubscribe(ctx, ctx.Topics...); err != nil {
+ cp.errHandler(err, conn)
+ }
+ delete(cp.conns, conn)
+ if err := conn.Close(); err != nil {
+ cp.errHandler(err, conn)
+ }
+ }
diff --git a/plugins/broadcast/websockets/event.go b/plugins/broadcast/websockets/event.go
new file mode 100644
index 00000000..3634bb89
--- /dev/null
+++ b/plugins/broadcast/websockets/event.go
@@ -0,0 +1,40 @@
+package websockets
+import (
+ ""
+const (
+ // EventConnect fired when new client is connected, the context is *websocket.Conn.
+ EventConnect = iota + 2500
+ // EventDisconnect fired when websocket is disconnected, context is empty.
+ EventDisconnect
+ // EventJoin caused when topics are being consumed, context if *TopicEvent.
+ EventJoin
+ // EventLeave caused when topic consumption are stopped, context if *TopicEvent.
+ EventLeave
+ // EventError when any broadcast error occurred, the context is *ErrorEvent.
+ EventError
+// ErrorEvent represents singular broadcast error event.
+type ErrorEvent struct {
+ // Conn specific to the error.
+ Conn *websocket.Conn
+ // Error contains job specific error.
+ Error error
+// TopicEvent caused when topic is joined or left.
+type TopicEvent struct {
+ // Conn associated with topics.
+ Conn *websocket.Conn
+ // Topics specific to event.
+ Topics []string
diff --git a/plugins/broadcast/websockets/rpc.go b/plugins/broadcast/websockets/rpc.go
new file mode 100644
index 00000000..1c62b902
--- /dev/null
+++ b/plugins/broadcast/websockets/rpc.go
@@ -0,0 +1,17 @@
+package websockets
+type rpcService struct {
+ svc *Service
+// Subscribe subscribes broadcast client to the given topic ahead of any websocket connections.
+func (r *rpcService) Subscribe(topic string, ok *bool) error {
+ *ok = true
+ return r.svc.client.Subscribe(topic)
+// SubscribePattern subscribes broadcast client to
+func (r *rpcService) SubscribePattern(pattern string, ok *bool) error {
+ *ok = true
+ return r.svc.client.SubscribePattern(pattern)
diff --git a/plugins/broadcast/websockets/service.go b/plugins/broadcast/websockets/service.go
new file mode 100644
index 00000000..f3c0c781
--- /dev/null
+++ b/plugins/broadcast/websockets/service.go
@@ -0,0 +1,228 @@
+package websockets
+import (
+ "encoding/json"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ ""
+// ID defines service id.
+const ID = "ws"
+// Service to manage websocket clients.
+type Service struct {
+ cfg *Config
+ upgrade websocket.Upgrader
+ client *broadcast.Client
+ connPool *connPool
+ listeners []func(event int, ctx interface{})
+ mu sync.Mutex
+ stopped int32
+ stop chan error
+// AddListener attaches server event controller.
+func (s *Service) AddListener(l func(event int, ctx interface{})) {
+ s.listeners = append(s.listeners, l)
+// Init the service.
+func (s *Service) Init(
+ cfg *Config,
+ env env.Environment,
+ rttp *rhttp.Service,
+ rpc *rpc.Service,
+ broadcast *broadcast.Service,
+) (bool, error) {
+ if broadcast == nil || rpc == nil {
+ // unable to activate
+ return false, nil
+ }
+ s.cfg = cfg
+ s.client = broadcast.NewClient()
+ s.connPool = newPool(s.client, s.reportError)
+ s.stopped = 0
+ if err := rpc.Register(ID, &rpcService{svc: s}); err != nil {
+ return false, err
+ }
+ if env != nil {
+ // ensure that underlying kernel knows what route to handle
+ env.SetEnv("RR_BROADCAST_PATH", cfg.Path)
+ }
+ // init all this stuff
+ s.upgrade = websocket.Upgrader{}
+ if s.cfg.NoOrigin {
+ s.upgrade.CheckOrigin = func(r *http.Request) bool {
+ return true
+ }
+ }
+ rttp.AddMiddleware(s.middleware)
+ return true, nil
+// Serve the websocket connections.
+func (s *Service) Serve() error {
+ defer s.client.Close()
+ defer s.connPool.close()
+ s.stop = make(chan error)
+ return <-s.stop
+// Stop the service and disconnect all connections.
+func (s *Service) Stop() {
+ defer
+ if atomic.CompareAndSwapInt32(&s.stopped, 0, 1) {
+ close(s.stop)
+ }
+// middleware intercepts websocket connections.
+func (s *Service) middleware(f http.HandlerFunc) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != s.cfg.Path {
+ f(w, r)
+ return
+ }
+ // checking server access
+ if err := newValidator().assertServerAccess(f, r); err != nil {
+ // show the error to the user
+ if av, ok := err.(*accessValidator); ok {
+ av.copy(w)
+ } else {
+ w.WriteHeader(400)
+ }
+ return
+ }
+ conn, err := s.upgrade.Upgrade(w, r, nil)
+ if err != nil {
+ s.reportError(err, nil)
+ return
+ }
+ s.throw(EventConnect, conn)
+ // manage connection
+ ctx, err := s.connPool.connect(conn)
+ if err != nil {
+ s.reportError(err, conn)
+ return
+ }
+ s.serveConn(ctx, f, r)
+ }
+// send and receive messages over websocket
+func (s *Service) serveConn(ctx *ConnContext, f http.HandlerFunc, r *http.Request) {
+ defer func() {
+ if err := s.connPool.disconnect(ctx.Conn); err != nil {
+ s.reportError(err, ctx.Conn)
+ }
+ s.throw(EventDisconnect, ctx.Conn)
+ }()
+ s.handleCommands(ctx, f, r)
+func (s *Service) handleCommands(ctx *ConnContext, f http.HandlerFunc, r *http.Request) {
+ cmd := &broadcast.Message{}
+ for {
+ if err := ctx.Conn.ReadJSON(cmd); err != nil {
+ s.reportError(err, ctx.Conn)
+ return
+ }
+ switch cmd.Topic {
+ case "join":
+ topics := make([]string, 0)
+ if err := unmarshalCommand(cmd, &topics); err != nil {
+ s.reportError(err, ctx.Conn)
+ return
+ }
+ if len(topics) == 0 {
+ continue
+ }
+ if err := newValidator().assertTopicsAccess(f, r, topics...); err != nil {
+ s.reportError(err, ctx.Conn)
+ if err := ctx.SendMessage("#join", topics); err != nil {
+ s.reportError(err, ctx.Conn)
+ return
+ }
+ continue
+ }
+ if err := s.connPool.subscribe(ctx, topics...); err != nil {
+ s.reportError(err, ctx.Conn)
+ return
+ }
+ if err := ctx.SendMessage("@join", topics); err != nil {
+ s.reportError(err, ctx.Conn)
+ return
+ }
+ s.throw(EventJoin, &TopicEvent{Conn: ctx.Conn, Topics: topics})
+ case "leave":
+ topics := make([]string, 0)
+ if err := unmarshalCommand(cmd, &topics); err != nil {
+ s.reportError(err, ctx.Conn)
+ return
+ }
+ if len(topics) == 0 {
+ continue
+ }
+ if err := s.connPool.unsubscribe(ctx, topics...); err != nil {
+ s.reportError(err, ctx.Conn)
+ return
+ }
+ if err := ctx.SendMessage("@leave", topics); err != nil {
+ s.reportError(err, ctx.Conn)
+ return
+ }
+ s.throw(EventLeave, &TopicEvent{Conn: ctx.Conn, Topics: topics})
+ }
+ }
+// handle connection error
+func (s *Service) reportError(err error, conn *websocket.Conn) {
+ s.throw(EventError, &ErrorEvent{Conn: conn, Error: err})
+// throw handles service, server and pool events.
+func (s *Service) throw(event int, ctx interface{}) {
+ for _, l := range s.listeners {
+ l(event, ctx)
+ }
+// unmarshalCommand command data.
+func unmarshalCommand(msg *broadcast.Message, v interface{}) error {
+ return json.Unmarshal(msg.Payload, v)
diff --git a/plugins/broadcast/websockets/service_test.go b/plugins/broadcast/websockets/service_test.go
new file mode 100644
index 00000000..911efc38
--- /dev/null
+++ b/plugins/broadcast/websockets/service_test.go
@@ -0,0 +1,706 @@
+package websockets
+import (
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "strings"
+ "testing"
+ "time"
+ ""
+ ""
+ ""
+ ""
+ ""
+ ""
+ rrhttp ""
+ ""
+ ""
+type testCfg struct {
+ http string
+ rpc string
+ ws string
+ broadcast string
+ target string
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == rrhttp.ID {
+ return &testCfg{target: cfg.http}
+ }
+ if name == ID {
+ return &testCfg{target:}
+ }
+ if name == rpc.ID {
+ return &testCfg{target: cfg.rpc}
+ }
+ if name == broadcast.ID {
+ return &testCfg{target: cfg.broadcast}
+ }
+ return nil
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ return json.Unmarshal([]byte(, out)
+func readStr(m interface{}) string {
+ return strings.TrimRight(string(m.([]byte)), "\n")
+func Test_HttpService_Echo(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6041",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 3000)
+ defer c.Stop()
+ req, err := http.NewRequest("GET", "http://localhost:6041/", nil)
+ assert.NoError(t, err)
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ _ = r.Body.Close()
+ }()
+ b, _ := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, []byte(""), b)
+func Test_HttpService_Echo400(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6040",
+ "workers":{"command": "php tests/worker-stop.php", "pool.numWorkers": 1}
+ }`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 3000)
+ defer c.Stop()
+ req, err := http.NewRequest("GET", "http://localhost:6040/", nil)
+ assert.NoError(t, err)
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ _ = r.Body.Close()
+ }()
+ assert.NoError(t, err)
+ assert.Equal(t, 401, r.StatusCode)
+func Test_Service_EnvPath(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6029",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 3000)
+ defer c.Stop()
+ req, err := http.NewRequest("GET", "http://localhost:6029/", nil)
+ assert.NoError(t, err)
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ if err != nil {
+ panic(err)
+ }
+ defer func() {
+ _ = r.Body.Close()
+ }()
+ b, _ := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, []byte("/ws"), b)
+func Test_Service_Disabled(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ _, s := c.Get(ID)
+ assert.Equal(t, service.StatusInactive, s)
+func Test_Service_JoinTopic(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6038",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ u := url.URL{Scheme: "ws", Host: "localhost:6038", Path: "/ws"}
+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ read := make(chan interface{})
+ go func() {
+ defer close(read)
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ return
+ }
+ read <- message
+ }
+ }()
+ err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
+ assert.NoError(t, err)
+ assert.Equal(t, `{"topic":"@join","payload":["topic"]}`, readStr(<-read))
+func Test_Service_DenyJoin(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6037",
+ "workers":{"command": "php tests/worker-deny.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ u := url.URL{Scheme: "ws", Host: "localhost:6037", Path: "/ws"}
+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ read := make(chan interface{})
+ go func() {
+ defer close(read)
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ read <- err
+ continue
+ }
+ read <- message
+ }
+ }()
+ err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
+ assert.NoError(t, err)
+ assert.Equal(t, `{"topic":"#join","payload":["topic"]}`, readStr(<-read))
+func Test_Service_DenyJoinServer(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6037",
+ "workers":{"command": "php tests/worker-stop.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ u := url.URL{Scheme: "ws", Host: "localhost:6037", Path: "/ws"}
+ _, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.Error(t, err)
+func Test_Service_EmptyTopics(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6036",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ u := url.URL{Scheme: "ws", Host: "localhost:6036", Path: "/ws"}
+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ read := make(chan interface{})
+ go func() {
+ defer close(read)
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ read <- err
+ continue
+ }
+ read <- message
+ }
+ }()
+ assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":[]}`)))
+ assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["a"]}`)))
+ assert.Equal(t, `{"topic":"@join","payload":["a"]}`, readStr(<-read))
+ assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"leave", "payload":[]}`)))
+ assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"leave", "payload":["a"]}`)))
+ assert.Equal(t, `{"topic":"@leave","payload":["a"]}`, readStr(<-read))
+ // must be automatically closed during service stop
+ assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["a"]}`)))
+ assert.Equal(t, `{"topic":"@join","payload":["a"]}`, readStr(<-read))
+func Test_Service_BadTopics(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6035",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ u := url.URL{Scheme: "ws", Host: "localhost:6035", Path: "/ws"}
+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ read := make(chan interface{})
+ go func() {
+ defer close(read)
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ read <- err
+ continue
+ }
+ read <- message
+ }
+ }()
+ assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":"hello"}`)))
+ assert.Error(t, (<-read).(error))
+func Test_Service_BadTopicsLeave(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6034",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ u := url.URL{Scheme: "ws", Host: "localhost:6034", Path: "/ws"}
+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ read := make(chan interface{})
+ go func() {
+ defer close(read)
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ read <- err
+ continue
+ }
+ read <- message
+ }
+ }()
+ assert.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"leave", "payload":"hello"}`)))
+ assert.Error(t, (<-read).(error))
+func Test_Service_Events(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6033",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ b, _ := c.Get(ID)
+ br := b.(*Service)
+ done := make(chan interface{})
+ br.AddListener(func(event int, ctx interface{}) {
+ if event == EventConnect {
+ close(done)
+ }
+ })
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ u := url.URL{Scheme: "ws", Host: "localhost:6033", Path: "/ws"}
+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ <-done
+ read := make(chan interface{})
+ go func() {
+ defer close(read)
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ return
+ }
+ read <- message
+ }
+ }()
+ err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
+ assert.NoError(t, err)
+ assert.Equal(t, `{"topic":"@join","payload":["topic"]}`, readStr(<-read))
+func Test_Service_Warmup(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6033",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ rp, _ := c.Get(rpc.ID)
+ b, _ := c.Get(ID)
+ br := b.(*Service)
+ done := make(chan interface{})
+ br.AddListener(func(event int, ctx interface{}) {
+ if event == EventConnect {
+ close(done)
+ }
+ })
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ client, err := rp.(*rpc.Service).Client()
+ assert.NoError(t, err)
+ var ok bool
+ assert.NoError(t, client.Call("ws.SubscribePattern", "test", &ok))
+ assert.True(t, ok)
+ assert.NoError(t, client.Call("ws.Subscribe", "test", &ok))
+ assert.True(t, ok)
+ u := url.URL{Scheme: "ws", Host: "localhost:6033", Path: "/ws"}
+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ <-done
+ read := make(chan interface{})
+ go func() {
+ defer close(read)
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ return
+ }
+ read <- message
+ }
+ }()
+ // not delivered
+ assert.NoError(t, br.client.Publish(&broadcast.Message{Topic: "topic", Payload: []byte(`"hello"`)}))
+ err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
+ assert.NoError(t, err)
+ assert.Equal(t, `{"topic":"@join","payload":["topic"]}`, readStr(<-read))
+ assert.NoError(t, br.client.Publish(&broadcast.Message{Topic: "topic", Payload: []byte(`"hello"`)}))
+ assert.Equal(t, `{"topic":"topic","payload":"hello"}`, readStr(<-read))
+func Test_Service_Stop(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+ c := service.NewContainer(logger)
+ c.Register(env.ID, &env.Service{})
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(broadcast.ID, &broadcast.Service{})
+ c.Register(ID, &Service{})
+ assert.NoError(t, c.Init(&testCfg{
+ http: `{
+ "address": ":6033",
+ "workers":{"command": "php tests/worker-ok.php", "pool.numWorkers": 1}
+ }`,
+ rpc: `{"listen":"tcp://"}`,
+ ws: `{"path":"/ws"}`,
+ broadcast: `{}`,
+ }))
+ rp, _ := c.Get(rpc.ID)
+ b, _ := c.Get(ID)
+ br := b.(*Service)
+ done := make(chan interface{})
+ br.AddListener(func(event int, ctx interface{}) {
+ if event == EventConnect {
+ close(done)
+ }
+ })
+ go func() { _ = c.Serve() }()
+ time.Sleep(time.Millisecond * 1000)
+ defer c.Stop()
+ client, err := rp.(*rpc.Service).Client()
+ assert.NoError(t, err)
+ var ok bool
+ assert.NoError(t, client.Call("ws.SubscribePattern", "test", &ok))
+ assert.True(t, ok)
+ assert.NoError(t, client.Call("ws.Subscribe", "test", &ok))
+ assert.True(t, ok)
+ u := url.URL{Scheme: "ws", Host: "localhost:6033", Path: "/ws"}
+ conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ <-done
+ read := make(chan interface{})
+ go func() {
+ defer close(read)
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ return
+ }
+ read <- message
+ }
+ }()
+ // not delivered
+ assert.NoError(t, br.client.Publish(&broadcast.Message{Topic: "topic", Payload: []byte(`"hello"`)}))
+ br.Stop()
+ err = conn.WriteMessage(websocket.TextMessage, []byte(`{"topic":"join", "payload":["topic"]}`))
+ assert.NoError(t, err)
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 344102f4..3b9c12ea 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -16,11 +16,11 @@ import (
+ handler ""
httpConfig ""
- handler ""
diff --git a/plugins/http/static/etag.go b/plugins/http/static/etag.go
index 5d41cc53..70673337 100644
--- a/plugins/http/static/etag.go
+++ b/plugins/http/static/etag.go
@@ -65,7 +65,3 @@ func appendUint(dst []byte, n uint32) []byte {
dst = append(dst, buf[i:]...)
return dst
-func byteToSrt(b []byte) string {
- return *(*string)(unsafe.Pointer(&b))
diff --git a/plugins/http/worker_handler/constants.go b/plugins/http/worker_handler/constants.go
deleted file mode 100644
index 3355d9c2..00000000
--- a/plugins/http/worker_handler/constants.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package handler
-import "net/http"
-var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push")
-// TrailerHeaderKey http header key
-var TrailerHeaderKey = http.CanonicalHeaderKey("trailer")
diff --git a/plugins/http/worker_handler/errors.go b/plugins/http/worker_handler/errors.go
deleted file mode 100644
index 5fa8e64e..00000000
--- a/plugins/http/worker_handler/errors.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// +build !windows
-package handler
-import (
- "errors"
- "net"
- "os"
- "syscall"
-// Broken pipe
-var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer")
-// handleWriteError just check if error was caused by aborted connection on linux
-func handleWriteError(err error) error {
- if netErr, ok2 := err.(*net.OpError); ok2 {
- if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
- if errors.Is(syscallErr.Err, syscall.EPIPE) {
- return errEPIPE
- }
- }
- }
- return err
diff --git a/plugins/http/worker_handler/errors_windows.go b/plugins/http/worker_handler/errors_windows.go
deleted file mode 100644
index 390cc7d1..00000000
--- a/plugins/http/worker_handler/errors_windows.go
+++ /dev/null
@@ -1,27 +0,0 @@
-// +build windows
-package handler
-import (
- "errors"
- "net"
- "os"
- "syscall"
-//Software caused connection abort.
-//An established connection was aborted by the software in your host computer,
-//possibly due to a data transmission time-out or protocol error.
-var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer")
-// handleWriteError just check if error was caused by aborted connection on windows
-func handleWriteError(err error) error {
- if netErr, ok2 := err.(*net.OpError); ok2 {
- if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
- if syscallErr.Err == syscall.WSAECONNABORTED {
- return errEPIPE
- }
- }
- }
- return err
diff --git a/plugins/http/worker_handler/handler.go b/plugins/http/worker_handler/handler.go
deleted file mode 100644
index be53fc12..00000000
--- a/plugins/http/worker_handler/handler.go
+++ /dev/null
@@ -1,217 +0,0 @@
-package handler
-import (
- "net"
- "net/http"
- "strconv"
- "strings"
- "sync"
- "time"
- ""
- ""
- ""
- ""
- ""
-// MB is 1024 bytes
-const MB uint64 = 1024 * 1024
-// ErrorEvent represents singular http error event.
-type ErrorEvent struct {
- // Request contains client request, must not be stored.
- Request *http.Request
- // Error - associated error, if any.
- Error error
- // event timings
- start time.Time
- elapsed time.Duration
-// Elapsed returns duration of the invocation.
-func (e *ErrorEvent) Elapsed() time.Duration {
- return e.elapsed
-// ResponseEvent represents singular http response event.
-type ResponseEvent struct {
- // Request contains client request, must not be stored.
- Request *Request
- // Response contains service response.
- Response *Response
- // event timings
- start time.Time
- elapsed time.Duration
-// Elapsed returns duration of the invocation.
-func (e *ResponseEvent) Elapsed() time.Duration {
- return e.elapsed
-// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
-// parsed files and query, payload will include parsed form dataTree (if any).
-type Handler struct {
- maxRequestSize uint64
- uploads config.Uploads
- trusted config.Cidrs
- log logger.Logger
- pool pool.Pool
- mul sync.Mutex
- lsn events.Listener
-// NewHandler return handle interface implementation
-func NewHandler(maxReqSize uint64, uploads config.Uploads, trusted config.Cidrs, pool pool.Pool) (*Handler, error) {
- if pool == nil {
- return nil, errors.E(errors.Str("pool should be initialized"))
- }
- return &Handler{
- maxRequestSize: maxReqSize * MB,
- uploads: uploads,
- pool: pool,
- trusted: trusted,
- }, nil
-// AddListener attaches handler event controller.
-func (h *Handler) AddListener(l events.Listener) {
- h.mul.Lock()
- defer h.mul.Unlock()
- h.lsn = l
-// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
-func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- const op = errors.Op("http_plugin_serve_http")
- start := time.Now()
- // validating request size
- if h.maxRequestSize != 0 {
- const op = errors.Op("http_handler_max_size")
- if length := r.Header.Get("content-length"); length != "" {
- // try to parse the value from the `content-length` header
- size, err := strconv.ParseInt(length, 10, 64)
- if err != nil {
- // if got an error while parsing -> assign 500 code to the writer and return
- http.Error(w, errors.E(op, err).Error(), 500)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)})
- return
- }
- if size > int64(h.maxRequestSize) {
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)})
- http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), 500)
- return
- }
- }
- }
- req, err := NewRequest(r, h.uploads)
- if err != nil {
- // if pipe is broken, there is no sense to write the header
- // in this case we just report about error
- if err == errEPIPE {
- h.sendEvent(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
- return
- }
- http.Error(w, errors.E(op, err).Error(), 500)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- return
- }
- // proxy IP resolution
- h.resolveIP(req)
- req.Open(h.log)
- defer req.Close(h.log)
- p, err := req.Payload()
- if err != nil {
- http.Error(w, errors.E(op, err).Error(), 500)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- return
- }
- rsp, err := h.pool.Exec(p)
- if err != nil {
- http.Error(w, errors.E(op, err).Error(), 500)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- return
- }
- resp, err := NewResponse(rsp)
- if err != nil {
- http.Error(w, errors.E(op, err).Error(), resp.Status)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- return
- }
- h.handleResponse(req, resp, start)
- err = resp.Write(w)
- if err != nil {
- http.Error(w, errors.E(op, err).Error(), 500)
- h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
- }
-// handleResponse triggers response event.
-func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) {
- h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
-// sendEvent invokes event handler if any.
-func (h *Handler) sendEvent(event interface{}) {
- if h.lsn != nil {
- h.lsn(event)
- }
-// get real ip passing multiple proxy
-func (h *Handler) resolveIP(r *Request) {
- if h.trusted.IsTrusted(r.RemoteAddr) == false { //nolint:gosimple
- return
- }
- if r.Header.Get("X-Forwarded-For") != "" {
- ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",")
- ipCount := len(ips)
- for i := ipCount - 1; i >= 0; i-- {
- addr := strings.TrimSpace(ips[i])
- if net.ParseIP(addr) != nil {
- r.RemoteAddr = addr
- return
- }
- }
- return
- }
- // The logic here is the following:
- // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address
- // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers
- // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF.
- // CF-Connecting-IP is an Enterprise feature and we check it last in order.
- // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string
- if r.Header.Get("X-Real-Ip") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip"))
- return
- }
- if r.Header.Get("True-Client-IP") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP"))
- return
- }
- if r.Header.Get("CF-Connecting-IP") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP"))
- }
diff --git a/plugins/http/worker_handler/parse.go b/plugins/http/worker_handler/parse.go
deleted file mode 100644
index 2790da2a..00000000
--- a/plugins/http/worker_handler/parse.go
+++ /dev/null
@@ -1,149 +0,0 @@
-package handler
-import (
- "net/http"
- ""
-// MaxLevel defines maximum tree depth for incoming request data and files.
-const MaxLevel = 127
-type dataTree map[string]interface{}
-type fileTree map[string]interface{}
-// parseData parses incoming request body into data tree.
-func parseData(r *http.Request) dataTree {
- data := make(dataTree)
- if r.PostForm != nil {
- for k, v := range r.PostForm {
- data.push(k, v)
- }
- }
- if r.MultipartForm != nil {
- for k, v := range r.MultipartForm.Value {
- data.push(k, v)
- }
- }
- return data
-// pushes value into data tree.
-func (d dataTree) push(k string, v []string) {
- keys := FetchIndexes(k)
- if len(keys) <= MaxLevel {
- d.mount(keys, v)
- }
-// mount mounts data tree recursively.
-func (d dataTree) mount(i []string, v []string) {
- if len(i) == 1 {
- // single value context (last element)
- d[i[0]] = v[len(v)-1]
- return
- }
- if len(i) == 2 && i[1] == "" {
- // non associated array of elements
- d[i[0]] = v
- return
- }
- if p, ok := d[i[0]]; ok {
- p.(dataTree).mount(i[1:], v)
- return
- }
- d[i[0]] = make(dataTree)
- d[i[0]].(dataTree).mount(i[1:], v)
-// parse incoming dataTree request into JSON (including contentMultipart form dataTree)
-func parseUploads(r *http.Request, cfg config.Uploads) *Uploads {
- u := &Uploads{
- cfg: cfg,
- tree: make(fileTree),
- list: make([]*FileUpload, 0),
- }
- for k, v := range r.MultipartForm.File {
- files := make([]*FileUpload, 0, len(v))
- for _, f := range v {
- files = append(files, NewUpload(f))
- }
- u.list = append(u.list, files...)
- u.tree.push(k, files)
- }
- return u
-// pushes new file upload into it's proper place.
-func (d fileTree) push(k string, v []*FileUpload) {
- keys := FetchIndexes(k)
- if len(keys) <= MaxLevel {
- d.mount(keys, v)
- }
-// mount mounts data tree recursively.
-func (d fileTree) mount(i []string, v []*FileUpload) {
- if len(i) == 1 {
- // single value context
- d[i[0]] = v[0]
- return
- }
- if len(i) == 2 && i[1] == "" {
- // non associated array of elements
- d[i[0]] = v
- return
- }
- if p, ok := d[i[0]]; ok {
- p.(fileTree).mount(i[1:], v)
- return
- }
- d[i[0]] = make(fileTree)
- d[i[0]].(fileTree).mount(i[1:], v)
-// FetchIndexes parses input name and splits it into separate indexes list.
-func FetchIndexes(s string) []string {
- var (
- pos int
- ch string
- keys = make([]string, 1)
- )
- for _, c := range s {
- ch = string(c)
- switch ch {
- case " ":
- // ignore all spaces
- continue
- case "[":
- pos = 1
- continue
- case "]":
- if pos == 1 {
- keys = append(keys, "")
- }
- pos = 2
- default:
- if pos == 1 || pos == 2 {
- keys = append(keys, "")
- }
- keys[len(keys)-1] += ch
- pos = 0
- }
- }
- return keys
diff --git a/plugins/http/worker_handler/request.go b/plugins/http/worker_handler/request.go
deleted file mode 100644
index 178bc827..00000000
--- a/plugins/http/worker_handler/request.go
+++ /dev/null
@@ -1,187 +0,0 @@
-package handler
-import (
- "fmt"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- "strings"
- j ""
- ""
- ""
- ""
- ""
-var json = j.ConfigCompatibleWithStandardLibrary
-const (
- defaultMaxMemory = 32 << 20 // 32 MB
- contentNone = iota + 900
- contentStream
- contentMultipart
- contentFormData
-// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
-type Request struct {
- // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address.
- RemoteAddr string `json:"remoteAddr"`
- // Protocol includes HTTP protocol version.
- Protocol string `json:"protocol"`
- // Method contains name of HTTP method used for the request.
- Method string `json:"method"`
- // URI contains full request URI with scheme and query.
- URI string `json:"uri"`
- // Header contains list of request headers.
- Header http.Header `json:"headers"`
- // Cookies contains list of request cookies.
- Cookies map[string]string `json:"cookies"`
- // RawQuery contains non parsed query string (to be parsed on php end).
- RawQuery string `json:"rawQuery"`
- // Parsed indicates that request body has been parsed on RR end.
- Parsed bool `json:"parsed"`
- // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
- Uploads *Uploads `json:"uploads"`
- // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions.
- Attributes map[string]interface{} `json:"attributes"`
- // request body can be parsedData or []byte
- body interface{}
-func fetchIP(pair string) string {
- if !strings.ContainsRune(pair, ':') {
- return pair
- }
- addr, _, _ := net.SplitHostPort(pair)
- return addr
-// NewRequest creates new PSR7 compatible request using net/http request.
-func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) {
- req := &Request{
- RemoteAddr: fetchIP(r.RemoteAddr),
- Protocol: r.Proto,
- Method: r.Method,
- URI: uri(r),
- Header: r.Header,
- Cookies: make(map[string]string),
- RawQuery: r.URL.RawQuery,
- Attributes: attributes.All(r),
- }
- for _, c := range r.Cookies() {
- if v, err := url.QueryUnescape(c.Value); err == nil {
- req.Cookies[c.Name] = v
- }
- }
- switch req.contentType() {
- case contentNone:
- return req, nil
- case contentStream:
- var err error
- req.body, err = ioutil.ReadAll(r.Body)
- return req, err
- case contentMultipart:
- if err := r.ParseMultipartForm(defaultMaxMemory); err != nil {
- return nil, err
- }
- req.Uploads = parseUploads(r, cfg)
- fallthrough
- case contentFormData:
- if err := r.ParseForm(); err != nil {
- return nil, err
- }
- req.body = parseData(r)
- }
- req.Parsed = true
- return req, nil
-// Open moves all uploaded files to temporary directory so it can be given to php later.
-func (r *Request) Open(log logger.Logger) {
- if r.Uploads == nil {
- return
- }
- r.Uploads.Open(log)
-// Close clears all temp file uploads
-func (r *Request) Close(log logger.Logger) {
- if r.Uploads == nil {
- return
- }
- r.Uploads.Clear(log)
-// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
-// files prior to calling this method.
-func (r *Request) Payload() (payload.Payload, error) {
- p := payload.Payload{}
- var err error
- if p.Context, err = json.Marshal(r); err != nil {
- return payload.Payload{}, err
- }
- if r.Parsed {
- if p.Body, err = json.Marshal(r.body); err != nil {
- return payload.Payload{}, err
- }
- } else if r.body != nil {
- p.Body = r.body.([]byte)
- }
- return p, nil
-// contentType returns the payload content type.
-func (r *Request) contentType() int {
- if r.Method == "HEAD" || r.Method == "OPTIONS" {
- return contentNone
- }
- ct := r.Header.Get("content-type")
- if strings.Contains(ct, "application/x-www-form-urlencoded") {
- return contentFormData
- }
- if strings.Contains(ct, "multipart/form-data") {
- return contentMultipart
- }
- return contentStream
-// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
-func uri(r *http.Request) string {
- if r.URL.Host != "" {
- return r.URL.String()
- }
- if r.TLS != nil {
- return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
- }
- return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
diff --git a/plugins/http/worker_handler/response.go b/plugins/http/worker_handler/response.go
deleted file mode 100644
index 1763d304..00000000
--- a/plugins/http/worker_handler/response.go
+++ /dev/null
@@ -1,105 +0,0 @@
-package handler
-import (
- "io"
- "net/http"
- "strings"
- "sync"
- ""
-// Response handles PSR7 response logic.
-type Response struct {
- // Status contains response status.
- Status int `json:"status"`
- // Header contains list of response headers.
- Headers map[string][]string `json:"headers"`
- // associated Body payload.
- Body interface{}
- sync.Mutex
-// NewResponse creates new response based on given pool payload.
-func NewResponse(p payload.Payload) (*Response, error) {
- r := &Response{Body: p.Body}
- if err := json.Unmarshal(p.Context, r); err != nil {
- return nil, err
- }
- return r, nil
-// Write writes response headers, status and body into ResponseWriter.
-func (r *Response) Write(w http.ResponseWriter) error {
- // INFO map is the reference type in golang
- p := handlePushHeaders(r.Headers)
- if pusher, ok := w.(http.Pusher); ok {
- for _, v := range p {
- err := pusher.Push(v, nil)
- if err != nil {
- return err
- }
- }
- }
- handleTrailers(r.Headers)
- for n, h := range r.Headers {
- for _, v := range h {
- w.Header().Add(n, v)
- }
- }
- w.WriteHeader(r.Status)
- if data, ok := r.Body.([]byte); ok {
- _, err := w.Write(data)
- if err != nil {
- return handleWriteError(err)
- }
- }
- if rc, ok := r.Body.(io.Reader); ok {
- if _, err := io.Copy(w, rc); err != nil {
- return err
- }
- }
- return nil
-func handlePushHeaders(h map[string][]string) []string {
- var p []string
- pushHeader, ok := h[http2pushHeaderKey]
- if !ok {
- return p
- }
- p = append(p, pushHeader...)
- delete(h, http2pushHeaderKey)
- return p
-func handleTrailers(h map[string][]string) {
- trailers, ok := h[TrailerHeaderKey]
- if !ok {
- return
- }
- for _, tr := range trailers {
- for _, n := range strings.Split(tr, ",") {
- n = strings.Trim(n, "\t ")
- if v, ok := h[n]; ok {
- h["Trailer:"+n] = v
- delete(h, n)
- }
- }
- }
- delete(h, TrailerHeaderKey)
diff --git a/plugins/http/worker_handler/uploads.go b/plugins/http/worker_handler/uploads.go
deleted file mode 100644
index e695000e..00000000
--- a/plugins/http/worker_handler/uploads.go
+++ /dev/null
@@ -1,159 +0,0 @@
-package handler
-import (
- ""
- ""
- "io"
- "io/ioutil"
- "mime/multipart"
- "os"
- "sync"
-const (
- // UploadErrorOK - no error, the file uploaded with success.
- UploadErrorOK = 0
- // UploadErrorNoFile - no file was uploaded.
- UploadErrorNoFile = 4
- // UploadErrorNoTmpDir - missing a temporary folder.
- UploadErrorNoTmpDir = 6
- // UploadErrorCantWrite - failed to write file to disk.
- UploadErrorCantWrite = 7
- // UploadErrorExtension - forbidden file extension.
- UploadErrorExtension = 8
-// Uploads tree manages uploaded files tree and temporary files.
-type Uploads struct {
- // associated temp directory and forbidden extensions.
- cfg config.Uploads
- // pre processed data tree for Uploads.
- tree fileTree
- // flat list of all file Uploads.
- list []*FileUpload
-// MarshalJSON marshal tree tree into JSON.
-func (u *Uploads) MarshalJSON() ([]byte, error) {
- return json.Marshal(u.tree)
-// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
-// will be handled individually.
-func (u *Uploads) Open(log logger.Logger) {
- var wg sync.WaitGroup
- for _, f := range u.list {
- wg.Add(1)
- go func(f *FileUpload) {
- defer wg.Done()
- err := f.Open(u.cfg)
- if err != nil && log != nil {
- log.Error("error opening the file", "err", err)
- }
- }(f)
- }
- wg.Wait()
-// Clear deletes all temporary files.
-func (u *Uploads) Clear(log logger.Logger) {
- for _, f := range u.list {
- if f.TempFilename != "" && exists(f.TempFilename) {
- err := os.Remove(f.TempFilename)
- if err != nil && log != nil {
- log.Error("error removing the file", "err", err)
- }
- }
- }
-// FileUpload represents singular file NewUpload.
-type FileUpload struct {
- // ID contains filename specified by the client.
- Name string `json:"name"`
- // Mime contains mime-type provided by the client.
- Mime string `json:"mime"`
- // Size of the uploaded file.
- Size int64 `json:"size"`
- // Error indicates file upload error (if any). See
- Error int `json:"error"`
- // TempFilename points to temporary file location.
- TempFilename string `json:"tmpName"`
- // associated file header
- header *multipart.FileHeader
-// NewUpload wraps net/http upload into PRS-7 compatible structure.
-func NewUpload(f *multipart.FileHeader) *FileUpload {
- return &FileUpload{
- Name: f.Filename,
- Mime: f.Header.Get("Content-Type"),
- Error: UploadErrorOK,
- header: f,
- }
-// Open moves file content into temporary file available for PHP.
-// NOTE:
-// There is 2 deferred functions, and in case of getting 2 errors from both functions
-// error from close of temp file would be overwritten by error from the main file
-func (f *FileUpload) Open(cfg config.Uploads) (err error) {
- if cfg.Forbids(f.Name) {
- f.Error = UploadErrorExtension
- return nil
- }
- file, err := f.header.Open()
- if err != nil {
- f.Error = UploadErrorNoFile
- return err
- }
- defer func() {
- // close the main file
- err = file.Close()
- }()
- tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
- if err != nil {
- // most likely cause of this issue is missing tmp dir
- f.Error = UploadErrorNoTmpDir
- return err
- }
- f.TempFilename = tmp.Name()
- defer func() {
- // close the temp file
- err = tmp.Close()
- }()
- if f.Size, err = io.Copy(tmp, file); err != nil {
- f.Error = UploadErrorCantWrite
- }
- return err
-// exists if file exists.
-func exists(path string) bool {
- if _, err := os.Stat(path); os.IsNotExist(err) {
- return false
- }
- return true
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 4947dbe3..240a28d1 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -216,6 +216,3 @@ func (r *rpc) Delete(in []byte, ok *bool) error {
return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
-func strConvert(s []byte) string {
- return *(*string)(unsafe.Pointer(&s))
diff --git a/plugins/logger/std_log_adapter.go b/plugins/logger/std_log_adapter.go
index 484cc23e..f00a0fd3 100644
--- a/plugins/logger/std_log_adapter.go
+++ b/plugins/logger/std_log_adapter.go
@@ -24,8 +24,3 @@ func NewStdAdapter(log Logger) *StdLogAdapter {
return logAdapter
-// unsafe, but lightning fast []byte to string conversion
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 22b568d8..13588b6e 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -260,8 +260,3 @@ func (server *Plugin) collectWorkerLogs(event interface{}) {
-// unsafe, but lightning fast []byte to string conversion
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))
diff --git a/plugins/service/process.go b/plugins/service/process.go
index 49219eb0..74aa789c 100644
--- a/plugins/service/process.go
+++ b/plugins/service/process.go
@@ -145,9 +145,3 @@ func (p *Process) execHandler() {
-// unsafe and fast []byte to string convert
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))