summaryrefslogtreecommitdiff
path: root/plugins/broadcast/root
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-03 22:52:30 +0300
committerValery Piashchynski <[email protected]>2021-05-03 22:52:30 +0300
commit9ee78f937d5be67058882dd3590f89da35bca239 (patch)
tree17cda27feabf5f2b8afc6a2796117835045afd36 /plugins/broadcast/root
parent009b7009885d8a15e6fa6c7e78436087b2f20129 (diff)
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/root')
-rw-r--r--plugins/broadcast/root/Makefile9
-rw-r--r--plugins/broadcast/root/broker.go36
-rw-r--r--plugins/broadcast/root/client.go133
-rw-r--r--plugins/broadcast/root/client_test.go59
-rw-r--r--plugins/broadcast/root/config.go61
-rw-r--r--plugins/broadcast/root/config_test.go60
-rw-r--r--plugins/broadcast/root/router.go170
-rw-r--r--plugins/broadcast/root/rpc.go25
-rw-r--r--plugins/broadcast/root/rpc_test.go72
-rw-r--r--plugins/broadcast/root/service.go85
-rw-r--r--plugins/broadcast/root/service_test.go65
-rw-r--r--plugins/broadcast/root/tests/.rr.yaml2
-rw-r--r--plugins/broadcast/root/tests/Broadcast/BroadcastTest.php56
-rw-r--r--plugins/broadcast/root/tests/Broadcast/MessageTest.php24
-rw-r--r--plugins/broadcast/root/tests/bootstrap.php15
-rw-r--r--plugins/broadcast/root/tests/docker-compose.yml9
-rw-r--r--plugins/broadcast/root/tests/go-client.go78
17 files changed, 959 insertions, 0 deletions
diff --git a/plugins/broadcast/root/Makefile b/plugins/broadcast/root/Makefile
new file mode 100644
index 00000000..d88312d2
--- /dev/null
+++ b/plugins/broadcast/root/Makefile
@@ -0,0 +1,9 @@
+clean:
+ rm -rf rr-jobbroadcast
+install: all
+ cp rr-broadcast /usr/local/bin/rr-broadcast
+uninstall:
+ rm -f /usr/local/bin/rr-broadcast
+test:
+ composer update
+ go test -v -race -cover
diff --git a/plugins/broadcast/root/broker.go b/plugins/broadcast/root/broker.go
new file mode 100644
index 00000000..923c8105
--- /dev/null
+++ b/plugins/broadcast/root/broker.go
@@ -0,0 +1,36 @@
+package broadcast
+
+import "encoding/json"
+
+// Broker defines the ability to operate as message passing broker.
+type Broker interface {
+ // Serve serves broker.
+ Serve() error
+
+ // Stop closes the consumption and disconnects broker.
+ Stop()
+
+ // Subscribe broker to one or multiple topics.
+ Subscribe(upstream chan *Message, topics ...string) error
+
+ // SubscribePattern broker to pattern.
+ SubscribePattern(upstream chan *Message, pattern string) error
+
+ // Unsubscribe broker from one or multiple topics.
+ Unsubscribe(upstream chan *Message, topics ...string) error
+
+ // UnsubscribePattern broker from pattern.
+ UnsubscribePattern(upstream chan *Message, pattern string) error
+
+ // Publish one or multiple Channel.
+ Publish(messages ...*Message) error
+}
+
+// Message represent single message.
+type Message struct {
+ // Topic message been pushed into.
+ Topic string `json:"topic"`
+
+ // Payload to be broadcasted. Must be valid json when transferred over RPC.
+ Payload json.RawMessage `json:"payload"`
+}
diff --git a/plugins/broadcast/root/client.go b/plugins/broadcast/root/client.go
new file mode 100644
index 00000000..c5761f94
--- /dev/null
+++ b/plugins/broadcast/root/client.go
@@ -0,0 +1,133 @@
+package broadcast
+
+import "sync"
+
+// Client subscribes to a given topic and consumes or publish messages to it.
+type Client struct {
+ upstream chan *Message
+ broker Broker
+ mu sync.Mutex
+ topics []string
+ patterns []string
+}
+
+// Channel returns incoming messages channel.
+func (c *Client) Channel() chan *Message {
+ return c.upstream
+}
+
+// Publish message into associated topic or topics.
+func (c *Client) Publish(msg ...*Message) error {
+ return c.broker.Publish(msg...)
+}
+
+// Subscribe client to specific topics.
+func (c *Client) Subscribe(topics ...string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ newTopics := make([]string, 0)
+ for _, topic := range topics {
+ found := false
+ for _, e := range c.topics {
+ if e == topic {
+ found = true
+ break
+ }
+ }
+
+ if !found {
+ newTopics = append(newTopics, topic)
+ }
+ }
+
+ if len(newTopics) == 0 {
+ return nil
+ }
+
+ c.topics = append(c.topics, newTopics...)
+
+ return c.broker.Subscribe(c.upstream, newTopics...)
+}
+
+// SubscribePattern subscribe client to the specific topic pattern.
+func (c *Client) SubscribePattern(pattern string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for _, g := range c.patterns {
+ if g == pattern {
+ return nil
+ }
+ }
+
+ c.patterns = append(c.patterns, pattern)
+ return c.broker.SubscribePattern(c.upstream, pattern)
+}
+
+// Unsubscribe client from specific topics
+func (c *Client) Unsubscribe(topics ...string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ dropTopics := make([]string, 0)
+ for _, topic := range topics {
+ for i, e := range c.topics {
+ if e == topic {
+ c.topics = append(c.topics[:i], c.topics[i+1:]...)
+ dropTopics = append(dropTopics, topic)
+ }
+ }
+ }
+
+ if len(dropTopics) == 0 {
+ return nil
+ }
+
+ return c.broker.Unsubscribe(c.upstream, dropTopics...)
+}
+
+// UnsubscribePattern client from topic pattern.
+func (c *Client) UnsubscribePattern(pattern string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for i := range c.patterns {
+ if c.patterns[i] == pattern {
+ c.patterns = append(c.patterns[:i], c.patterns[i+1:]...)
+
+ return c.broker.UnsubscribePattern(c.upstream, pattern)
+ }
+ }
+
+ return nil
+}
+
+// Topics return all the topics client subscribed to.
+func (c *Client) Topics() []string {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ return c.topics
+}
+
+// Patterns return all the patterns client subscribed to.
+func (c *Client) Patterns() []string {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ return c.patterns
+}
+
+// Close the client and consumption.
+func (c *Client) Close() (err error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if len(c.topics) != 0 {
+ err = c.broker.Unsubscribe(c.upstream, c.topics...)
+ }
+
+ close(c.upstream)
+ return err
+}
diff --git a/plugins/broadcast/root/client_test.go b/plugins/broadcast/root/client_test.go
new file mode 100644
index 00000000..52a50d57
--- /dev/null
+++ b/plugins/broadcast/root/client_test.go
@@ -0,0 +1,59 @@
+package broadcast
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Client_Topics(t *testing.T) {
+ br, _, c := setup(`{}`)
+ defer c.Stop()
+
+ client := br.NewClient()
+ defer client.Close()
+
+ assert.Equal(t, []string{}, client.Topics())
+
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.Equal(t, []string{"topic"}, client.Topics())
+
+ assert.NoError(t, client.Subscribe("topic"))
+ assert.Equal(t, []string{"topic"}, client.Topics())
+
+ assert.NoError(t, br.broker.Subscribe(client.upstream, "topic"))
+ assert.Equal(t, []string{"topic"}, client.Topics())
+
+ assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+
+ assert.NoError(t, client.Unsubscribe("topic"))
+ assert.NoError(t, client.Unsubscribe("topic"))
+ assert.NoError(t, br.broker.Unsubscribe(client.upstream, "topic"))
+
+ assert.Equal(t, []string{}, client.Topics())
+}
+
+func Test_Client_Patterns(t *testing.T) {
+ br, _, c := setup(`{}`)
+ defer c.Stop()
+
+ client := br.NewClient()
+ defer client.Close()
+
+ assert.Equal(t, []string{}, client.Patterns())
+
+ assert.NoError(t, client.SubscribePattern("topic/*"))
+ assert.Equal(t, []string{"topic/*"}, client.Patterns())
+
+ assert.NoError(t, br.broker.SubscribePattern(client.upstream, "topic/*"))
+ assert.Equal(t, []string{"topic/*"}, client.Patterns())
+
+ assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1")))
+ assert.Equal(t, `hello1`, readStr(<-client.Channel()))
+
+ assert.NoError(t, client.UnsubscribePattern("topic/*"))
+ assert.NoError(t, br.broker.UnsubscribePattern(client.upstream, "topic/*"))
+
+ assert.Equal(t, []string{}, client.Patterns())
+}
diff --git a/plugins/broadcast/root/config.go b/plugins/broadcast/root/config.go
new file mode 100644
index 00000000..8c732441
--- /dev/null
+++ b/plugins/broadcast/root/config.go
@@ -0,0 +1,61 @@
+package broadcast
+
+import (
+ "errors"
+
+ "github.com/go-redis/redis/v8"
+)
+
+// Config configures the broadcast extension.
+type Config struct {
+ // RedisConfig configures redis broker.
+ Redis *RedisConfig
+}
+
+// Hydrate reads the configuration values from the source configuration.
+//func (c *Config) Hydrate(cfg service.Config) error {
+// if err := cfg.Unmarshal(c); err != nil {
+// return err
+// }
+//
+// if c.Redis != nil {
+// return c.Redis.isValid()
+// }
+//
+// return nil
+//}
+
+// InitDefaults enables in memory broadcast configuration.
+func (c *Config) InitDefaults() error {
+ return nil
+}
+
+// RedisConfig configures redis broker.
+type RedisConfig struct {
+ // Addr of the redis server.
+ Addr string
+
+ // Password to redis server.
+ Password string
+
+ // DB index.
+ DB int
+}
+
+// clusterOptions
+func (cfg *RedisConfig) redisClient() redis.UniversalClient {
+ return redis.NewClient(&redis.Options{
+ Addr: cfg.Addr,
+ Password: cfg.Password,
+ PoolSize: 2,
+ })
+}
+
+// check if redis config is valid.
+func (cfg *RedisConfig) isValid() error {
+ if cfg.Addr == "" {
+ return errors.New("redis addr is required")
+ }
+
+ return nil
+}
diff --git a/plugins/broadcast/root/config_test.go b/plugins/broadcast/root/config_test.go
new file mode 100644
index 00000000..28191c6b
--- /dev/null
+++ b/plugins/broadcast/root/config_test.go
@@ -0,0 +1,60 @@
+package broadcast
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/rpc"
+ "github.com/stretchr/testify/assert"
+)
+
+type testCfg struct {
+ rpc string
+ broadcast string
+ target string
+}
+
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == ID {
+ return &testCfg{target: cfg.broadcast}
+ }
+
+ if name == rpc.ID {
+ return &testCfg{target: cfg.rpc}
+ }
+
+ return nil
+}
+
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ return json.Unmarshal([]byte(cfg.target), out)
+}
+
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &testCfg{target: `{"dead`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_OK(t *testing.T) {
+ cfg := &testCfg{target: `{"path":"/path"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Redis_Error(t *testing.T) {
+ cfg := &testCfg{target: `{"path":"/path","redis":{}}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Redis_OK(t *testing.T) {
+ cfg := &testCfg{target: `{"path":"/path","redis":{"addr":"localhost:6379"}}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go
new file mode 100644
index 00000000..91137f8b
--- /dev/null
+++ b/plugins/broadcast/root/router.go
@@ -0,0 +1,170 @@
+package broadcast
+
+//import "github.com/gobwas/glob"
+
+// Router performs internal message routing to multiple subscribers.
+type Router struct {
+ wildcard map[string]wildcard
+ routes map[string][]chan *Message
+}
+
+// wildcard handles number of topics via glob pattern.
+type wildcard struct {
+ //glob glob.Glob
+ upstream []chan *Message
+}
+
+// helper for blocking join/leave flow
+type subscriber struct {
+ upstream chan *Message
+ done chan error
+ topics []string
+ pattern string
+}
+
+// NewRouter creates new topic and pattern router.
+func NewRouter() *Router {
+ return &Router{
+ wildcard: make(map[string]wildcard),
+ routes: make(map[string][]chan *Message),
+ }
+}
+
+// Dispatch to all connected topics.
+func (r *Router) Dispatch(msg *Message) {
+ for _, w := range r.wildcard {
+ if w.glob.Match(msg.Topic) {
+ for _, upstream := range w.upstream {
+ upstream <- msg
+ }
+ }
+ }
+
+ if routes, ok := r.routes[msg.Topic]; ok {
+ for _, upstream := range routes {
+ upstream <- msg
+ }
+ }
+}
+
+// Subscribe to topic and return list of newly assigned topics.
+func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) {
+ newTopics = make([]string, 0)
+ for _, topic := range topics {
+ if _, ok := r.routes[topic]; !ok {
+ r.routes[topic] = []chan *Message{upstream}
+ if !r.collapsed(topic) {
+ newTopics = append(newTopics, topic)
+ }
+ continue
+ }
+
+ joined := false
+ for _, up := range r.routes[topic] {
+ if up == upstream {
+ joined = true
+ break
+ }
+ }
+
+ if !joined {
+ r.routes[topic] = append(r.routes[topic], upstream)
+ }
+ }
+
+ return newTopics
+}
+
+// Unsubscribe from given list of topics and return list of topics which are no longer claimed.
+func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) {
+ dropTopics = make([]string, 0)
+ for _, topic := range topics {
+ if _, ok := r.routes[topic]; !ok {
+ // no such topic, ignore
+ continue
+ }
+
+ for i := range r.routes[topic] {
+ if r.routes[topic][i] == upstream {
+ r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...)
+ break
+ }
+ }
+
+ if len(r.routes[topic]) == 0 {
+ delete(r.routes, topic)
+
+ // standalone empty subscription
+ if !r.collapsed(topic) {
+ dropTopics = append(dropTopics, topic)
+ }
+ }
+ }
+
+ return dropTopics
+}
+
+// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in
+// case if blob is invalid.
+func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) {
+ if w, ok := r.wildcard[pattern]; ok {
+ joined := false
+ for _, up := range w.upstream {
+ if up == upstream {
+ joined = true
+ break
+ }
+ }
+
+ if !joined {
+ w.upstream = append(w.upstream, upstream)
+ }
+
+ return nil, nil
+ }
+
+ g, err := glob.Compile(pattern)
+ if err != nil {
+ return nil, err
+ }
+
+ r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}}
+
+ return []string{pattern}, nil
+}
+
+// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed.
+func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) {
+ // todo: store and return collapsed topics
+
+ w, ok := r.wildcard[pattern]
+ if !ok {
+ // no such pattern
+ return nil
+ }
+
+ for i, up := range w.upstream {
+ if up == upstream {
+ w.upstream[i] = w.upstream[len(w.upstream)-1]
+ w.upstream[len(w.upstream)-1] = nil
+ w.upstream = w.upstream[:len(w.upstream)-1]
+
+ if len(w.upstream) == 0 {
+ delete(r.wildcard, pattern)
+ return []string{pattern}
+ }
+ }
+ }
+
+ return nil
+}
+
+func (r *Router) collapsed(topic string) bool {
+ for _, w := range r.wildcard {
+ if w.glob.Match(topic) {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/plugins/broadcast/root/rpc.go b/plugins/broadcast/root/rpc.go
new file mode 100644
index 00000000..5604a574
--- /dev/null
+++ b/plugins/broadcast/root/rpc.go
@@ -0,0 +1,25 @@
+package broadcast
+
+import "golang.org/x/sync/errgroup"
+
+type rpcService struct {
+ svc *Service
+}
+
+// Publish Messages.
+func (r *rpcService) Publish(msg []*Message, ok *bool) error {
+ *ok = true
+ return r.svc.Publish(msg...)
+}
+
+// Publish Messages in async mode. Blocks until get an err or nil from publish
+func (r *rpcService) PublishAsync(msg []*Message, ok *bool) error {
+ *ok = true
+ g := &errgroup.Group{}
+
+ g.Go(func() error {
+ return r.svc.Publish(msg...)
+ })
+
+ return g.Wait()
+}
diff --git a/plugins/broadcast/root/rpc_test.go b/plugins/broadcast/root/rpc_test.go
new file mode 100644
index 00000000..157c4e70
--- /dev/null
+++ b/plugins/broadcast/root/rpc_test.go
@@ -0,0 +1,72 @@
+package broadcast
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestRPC_Broadcast(t *testing.T) {
+ br, rpc, c := setup(`{}`)
+ defer c.Stop()
+
+ client := br.NewClient()
+ defer client.Close()
+
+ rcpClient, err := rpc.Client()
+ assert.NoError(t, err)
+
+ // must not be delivered
+ ok := false
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello1"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+
+ assert.NoError(t, client.Subscribe("topic"))
+
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello1"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.Equal(t, `"hello1"`, readStr(<-client.Channel()))
+
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello2"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.Equal(t, `"hello2"`, readStr(<-client.Channel()))
+
+ assert.NoError(t, client.Unsubscribe("topic"))
+
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello3"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+
+ assert.NoError(t, client.Subscribe("topic"))
+
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.Publish",
+ []*Message{newMessage("topic", `"hello4"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.Equal(t, `"hello4"`, readStr(<-client.Channel()))
+
+ assert.NoError(t, rcpClient.Call(
+ "broadcast.PublishAsync",
+ []*Message{newMessage("topic", `"hello5"`)},
+ &ok,
+ ))
+ assert.True(t, ok)
+ assert.Equal(t, `"hello5"`, readStr(<-client.Channel()))
+}
diff --git a/plugins/broadcast/root/service.go b/plugins/broadcast/root/service.go
new file mode 100644
index 00000000..8b175b3e
--- /dev/null
+++ b/plugins/broadcast/root/service.go
@@ -0,0 +1,85 @@
+package broadcast
+
+import (
+ "errors"
+ "sync"
+
+ "github.com/spiral/roadrunner/service/rpc"
+)
+
+// ID defines public service name.
+const ID = "broadcast"
+
+// Service manages even broadcasting and websocket interface.
+type Service struct {
+ // service and broker configuration
+ cfg *Config
+
+ // broker
+ mu sync.Mutex
+ broker Broker
+}
+
+// Init service.
+func (s *Service) Init(cfg *Config, rpc *rpc.Service) (ok bool, err error) {
+ s.cfg = cfg
+
+ if rpc != nil {
+ if err := rpc.Register(ID, &rpcService{svc: s}); err != nil {
+ return false, err
+ }
+ }
+
+ s.mu.Lock()
+ if s.cfg.Redis != nil {
+ if s.broker, err = redisBroker(s.cfg.Redis); err != nil {
+ return false, err
+ }
+ } else {
+ s.broker = memoryBroker()
+ }
+ s.mu.Unlock()
+
+ return true, nil
+}
+
+// Serve broadcast broker.
+func (s *Service) Serve() (err error) {
+ return s.broker.Serve()
+}
+
+// Stop closes broadcast broker.
+func (s *Service) Stop() {
+ broker := s.Broker()
+ if broker != nil {
+ broker.Stop()
+ }
+}
+
+// Broker returns associated broker.
+func (s *Service) Broker() Broker {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ return s.broker
+}
+
+// NewClient returns single connected client with ability to consume or produce into associated topic(svc).
+func (s *Service) NewClient() *Client {
+ return &Client{
+ upstream: make(chan *Message),
+ broker: s.Broker(),
+ topics: make([]string, 0),
+ patterns: make([]string, 0),
+ }
+}
+
+// Publish one or multiple Channel.
+func (s *Service) Publish(msg ...*Message) error {
+ broker := s.Broker()
+ if broker == nil {
+ return errors.New("no stopped broker")
+ }
+
+ return s.Broker().Publish(msg...)
+}
diff --git a/plugins/broadcast/root/service_test.go b/plugins/broadcast/root/service_test.go
new file mode 100644
index 00000000..10b924cc
--- /dev/null
+++ b/plugins/broadcast/root/service_test.go
@@ -0,0 +1,65 @@
+package broadcast
+
+import (
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/rpc"
+ "github.com/stretchr/testify/assert"
+)
+
+var rpcPort = 6010
+
+func setup(cfg string) (*Service, *rpc.Service, service.Container) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{
+ broadcast: cfg,
+ rpc: fmt.Sprintf(`{"listen":"tcp://:%v"}`, rpcPort),
+ })
+
+ rpcPort++
+
+ if err != nil {
+ panic(err)
+ }
+
+ go func() {
+ err = c.Serve()
+ if err != nil {
+ panic(err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 100)
+
+ b, _ := c.Get(ID)
+ br := b.(*Service)
+
+ r, _ := c.Get(rpc.ID)
+ rp := r.(*rpc.Service)
+
+ return br, rp, c
+}
+
+func readStr(m *Message) string {
+ return strings.TrimRight(string(m.Payload), "\n")
+}
+
+func newMessage(t, m string) *Message {
+ return &Message{Topic: t, Payload: []byte(m)}
+}
+
+func TestService_Publish(t *testing.T) {
+ svc := &Service{}
+ assert.Error(t, svc.Publish(nil))
+}
diff --git a/plugins/broadcast/root/tests/.rr.yaml b/plugins/broadcast/root/tests/.rr.yaml
new file mode 100644
index 00000000..c35a12fc
--- /dev/null
+++ b/plugins/broadcast/root/tests/.rr.yaml
@@ -0,0 +1,2 @@
+broadcast:
+ redis.addr: "localhost:6379" \ No newline at end of file
diff --git a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php
new file mode 100644
index 00000000..d6014bf0
--- /dev/null
+++ b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php
@@ -0,0 +1,56 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Broadcast\Tests;
+
+use PHPUnit\Framework\TestCase;
+use Spiral\Broadcast\Broadcast;
+use Spiral\Broadcast\Exception\BroadcastException;
+use Spiral\Broadcast\Message;
+use Spiral\Goridge\RPC;
+use Spiral\Goridge\SocketRelay;
+
+class BroadcastTest extends TestCase
+{
+ public function testBroadcast(): void
+ {
+ $rpc = new RPC(new SocketRelay('localhost', 6001));
+ $br = new Broadcast($rpc);
+
+ $br->publish(
+ new Message('tests/topic', 'hello'),
+ new Message('tests/123', ['key' => 'value'])
+ );
+
+ while (filesize(__DIR__ . '/../log.txt') < 40) {
+ clearstatcache(true, __DIR__ . '/../log.txt');
+ usleep(1000);
+ }
+
+ clearstatcache(true, __DIR__ . '/../log.txt');
+ $content = file_get_contents(__DIR__ . '/../log.txt');
+
+ $this->assertSame('tests/topic: "hello"
+tests/123: {"key":"value"}
+', $content);
+ }
+
+ public function testBroadcastException(): void
+ {
+ $rpc = new RPC(new SocketRelay('localhost', 6002));
+ $br = new Broadcast($rpc);
+
+ $this->expectException(BroadcastException::class);
+ $br->publish(
+ new Message('topic', 'hello')
+ );
+ }
+}
diff --git a/plugins/broadcast/root/tests/Broadcast/MessageTest.php b/plugins/broadcast/root/tests/Broadcast/MessageTest.php
new file mode 100644
index 00000000..dd9e1cc3
--- /dev/null
+++ b/plugins/broadcast/root/tests/Broadcast/MessageTest.php
@@ -0,0 +1,24 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\Broadcast\Tests;
+
+use PHPUnit\Framework\TestCase;
+use Spiral\Broadcast\Message;
+
+class MessageTest extends TestCase
+{
+ public function testSerialize(): void
+ {
+ $m = new Message('topic', ['hello' => 'world']);
+ $this->assertSame('{"topic":"topic","payload":{"hello":"world"}}', json_encode($m));
+ }
+}
diff --git a/plugins/broadcast/root/tests/bootstrap.php b/plugins/broadcast/root/tests/bootstrap.php
new file mode 100644
index 00000000..d0dfb88b
--- /dev/null
+++ b/plugins/broadcast/root/tests/bootstrap.php
@@ -0,0 +1,15 @@
+<?php
+
+/**
+ * Spiral Framework, SpiralScout LLC.
+ *
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+error_reporting(E_ALL | E_STRICT);
+ini_set('display_errors', 'stderr');
+
+//Composer
+require dirname(__DIR__) . '/vendor_php/autoload.php';
diff --git a/plugins/broadcast/root/tests/docker-compose.yml b/plugins/broadcast/root/tests/docker-compose.yml
new file mode 100644
index 00000000..123aa9b9
--- /dev/null
+++ b/plugins/broadcast/root/tests/docker-compose.yml
@@ -0,0 +1,9 @@
+version: '3'
+
+services:
+ redis:
+ image: 'bitnami/redis:latest'
+ environment:
+ - ALLOW_EMPTY_PASSWORD=yes
+ ports:
+ - "6379:6379" \ No newline at end of file
diff --git a/plugins/broadcast/root/tests/go-client.go b/plugins/broadcast/root/tests/go-client.go
new file mode 100644
index 00000000..21442a01
--- /dev/null
+++ b/plugins/broadcast/root/tests/go-client.go
@@ -0,0 +1,78 @@
+package main
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/spiral/broadcast/v2"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/service/rpc"
+ "golang.org/x/sync/errgroup"
+)
+
+type logService struct {
+ broadcast *broadcast.Service
+ stop chan interface{}
+}
+
+func (l *logService) Init(service *broadcast.Service) (bool, error) {
+ l.broadcast = service
+
+ return true, nil
+}
+
+func (l *logService) Serve() error {
+ l.stop = make(chan interface{})
+
+ client := l.broadcast.NewClient()
+ if err := client.SubscribePattern("tests/*"); err != nil {
+ return err
+ }
+
+ logFile, _ := os.Create("log.txt")
+
+ g := &errgroup.Group{}
+ g.Go(func() error {
+ for msg := range client.Channel() {
+ _, err := logFile.Write([]byte(fmt.Sprintf(
+ "%s: %s\n",
+ msg.Topic,
+ string(msg.Payload),
+ )))
+ if err != nil {
+ return err
+ }
+
+ err = logFile.Sync()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+
+ <-l.stop
+ err := logFile.Close()
+ if err != nil {
+ return err
+ }
+
+ err = client.Close()
+ if err != nil {
+ return err
+ }
+
+ return g.Wait()
+}
+
+func (l *logService) Stop() {
+ close(l.stop)
+}
+
+func main() {
+ rr.Container.Register(rpc.ID, &rpc.Service{})
+ rr.Container.Register(broadcast.ID, &broadcast.Service{})
+ rr.Container.Register("log", &logService{})
+
+ rr.Execute()
+}