diff options
author | Valery Piashchynski <[email protected]> | 2021-05-03 22:52:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-03 22:52:30 +0300 |
commit | 9ee78f937d5be67058882dd3590f89da35bca239 (patch) | |
tree | 17cda27feabf5f2b8afc6a2796117835045afd36 /plugins/broadcast/root | |
parent | 009b7009885d8a15e6fa6c7e78436087b2f20129 (diff) |
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/root')
-rw-r--r-- | plugins/broadcast/root/Makefile | 9 | ||||
-rw-r--r-- | plugins/broadcast/root/broker.go | 36 | ||||
-rw-r--r-- | plugins/broadcast/root/client.go | 133 | ||||
-rw-r--r-- | plugins/broadcast/root/client_test.go | 59 | ||||
-rw-r--r-- | plugins/broadcast/root/config.go | 61 | ||||
-rw-r--r-- | plugins/broadcast/root/config_test.go | 60 | ||||
-rw-r--r-- | plugins/broadcast/root/router.go | 170 | ||||
-rw-r--r-- | plugins/broadcast/root/rpc.go | 25 | ||||
-rw-r--r-- | plugins/broadcast/root/rpc_test.go | 72 | ||||
-rw-r--r-- | plugins/broadcast/root/service.go | 85 | ||||
-rw-r--r-- | plugins/broadcast/root/service_test.go | 65 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/.rr.yaml | 2 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/Broadcast/BroadcastTest.php | 56 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/Broadcast/MessageTest.php | 24 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/bootstrap.php | 15 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/docker-compose.yml | 9 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/go-client.go | 78 |
17 files changed, 959 insertions, 0 deletions
diff --git a/plugins/broadcast/root/Makefile b/plugins/broadcast/root/Makefile new file mode 100644 index 00000000..d88312d2 --- /dev/null +++ b/plugins/broadcast/root/Makefile @@ -0,0 +1,9 @@ +clean: + rm -rf rr-jobbroadcast +install: all + cp rr-broadcast /usr/local/bin/rr-broadcast +uninstall: + rm -f /usr/local/bin/rr-broadcast +test: + composer update + go test -v -race -cover diff --git a/plugins/broadcast/root/broker.go b/plugins/broadcast/root/broker.go new file mode 100644 index 00000000..923c8105 --- /dev/null +++ b/plugins/broadcast/root/broker.go @@ -0,0 +1,36 @@ +package broadcast + +import "encoding/json" + +// Broker defines the ability to operate as message passing broker. +type Broker interface { + // Serve serves broker. + Serve() error + + // Stop closes the consumption and disconnects broker. + Stop() + + // Subscribe broker to one or multiple topics. + Subscribe(upstream chan *Message, topics ...string) error + + // SubscribePattern broker to pattern. + SubscribePattern(upstream chan *Message, pattern string) error + + // Unsubscribe broker from one or multiple topics. + Unsubscribe(upstream chan *Message, topics ...string) error + + // UnsubscribePattern broker from pattern. + UnsubscribePattern(upstream chan *Message, pattern string) error + + // Publish one or multiple Channel. + Publish(messages ...*Message) error +} + +// Message represent single message. +type Message struct { + // Topic message been pushed into. + Topic string `json:"topic"` + + // Payload to be broadcasted. Must be valid json when transferred over RPC. + Payload json.RawMessage `json:"payload"` +} diff --git a/plugins/broadcast/root/client.go b/plugins/broadcast/root/client.go new file mode 100644 index 00000000..c5761f94 --- /dev/null +++ b/plugins/broadcast/root/client.go @@ -0,0 +1,133 @@ +package broadcast + +import "sync" + +// Client subscribes to a given topic and consumes or publish messages to it. +type Client struct { + upstream chan *Message + broker Broker + mu sync.Mutex + topics []string + patterns []string +} + +// Channel returns incoming messages channel. +func (c *Client) Channel() chan *Message { + return c.upstream +} + +// Publish message into associated topic or topics. +func (c *Client) Publish(msg ...*Message) error { + return c.broker.Publish(msg...) +} + +// Subscribe client to specific topics. +func (c *Client) Subscribe(topics ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + newTopics := make([]string, 0) + for _, topic := range topics { + found := false + for _, e := range c.topics { + if e == topic { + found = true + break + } + } + + if !found { + newTopics = append(newTopics, topic) + } + } + + if len(newTopics) == 0 { + return nil + } + + c.topics = append(c.topics, newTopics...) + + return c.broker.Subscribe(c.upstream, newTopics...) +} + +// SubscribePattern subscribe client to the specific topic pattern. +func (c *Client) SubscribePattern(pattern string) error { + c.mu.Lock() + defer c.mu.Unlock() + + for _, g := range c.patterns { + if g == pattern { + return nil + } + } + + c.patterns = append(c.patterns, pattern) + return c.broker.SubscribePattern(c.upstream, pattern) +} + +// Unsubscribe client from specific topics +func (c *Client) Unsubscribe(topics ...string) error { + c.mu.Lock() + defer c.mu.Unlock() + + dropTopics := make([]string, 0) + for _, topic := range topics { + for i, e := range c.topics { + if e == topic { + c.topics = append(c.topics[:i], c.topics[i+1:]...) + dropTopics = append(dropTopics, topic) + } + } + } + + if len(dropTopics) == 0 { + return nil + } + + return c.broker.Unsubscribe(c.upstream, dropTopics...) +} + +// UnsubscribePattern client from topic pattern. +func (c *Client) UnsubscribePattern(pattern string) error { + c.mu.Lock() + defer c.mu.Unlock() + + for i := range c.patterns { + if c.patterns[i] == pattern { + c.patterns = append(c.patterns[:i], c.patterns[i+1:]...) + + return c.broker.UnsubscribePattern(c.upstream, pattern) + } + } + + return nil +} + +// Topics return all the topics client subscribed to. +func (c *Client) Topics() []string { + c.mu.Lock() + defer c.mu.Unlock() + + return c.topics +} + +// Patterns return all the patterns client subscribed to. +func (c *Client) Patterns() []string { + c.mu.Lock() + defer c.mu.Unlock() + + return c.patterns +} + +// Close the client and consumption. +func (c *Client) Close() (err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if len(c.topics) != 0 { + err = c.broker.Unsubscribe(c.upstream, c.topics...) + } + + close(c.upstream) + return err +} diff --git a/plugins/broadcast/root/client_test.go b/plugins/broadcast/root/client_test.go new file mode 100644 index 00000000..52a50d57 --- /dev/null +++ b/plugins/broadcast/root/client_test.go @@ -0,0 +1,59 @@ +package broadcast + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_Client_Topics(t *testing.T) { + br, _, c := setup(`{}`) + defer c.Stop() + + client := br.NewClient() + defer client.Close() + + assert.Equal(t, []string{}, client.Topics()) + + assert.NoError(t, client.Subscribe("topic")) + assert.Equal(t, []string{"topic"}, client.Topics()) + + assert.NoError(t, client.Subscribe("topic")) + assert.Equal(t, []string{"topic"}, client.Topics()) + + assert.NoError(t, br.broker.Subscribe(client.upstream, "topic")) + assert.Equal(t, []string{"topic"}, client.Topics()) + + assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) + assert.Equal(t, `hello1`, readStr(<-client.Channel())) + + assert.NoError(t, client.Unsubscribe("topic")) + assert.NoError(t, client.Unsubscribe("topic")) + assert.NoError(t, br.broker.Unsubscribe(client.upstream, "topic")) + + assert.Equal(t, []string{}, client.Topics()) +} + +func Test_Client_Patterns(t *testing.T) { + br, _, c := setup(`{}`) + defer c.Stop() + + client := br.NewClient() + defer client.Close() + + assert.Equal(t, []string{}, client.Patterns()) + + assert.NoError(t, client.SubscribePattern("topic/*")) + assert.Equal(t, []string{"topic/*"}, client.Patterns()) + + assert.NoError(t, br.broker.SubscribePattern(client.upstream, "topic/*")) + assert.Equal(t, []string{"topic/*"}, client.Patterns()) + + assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1"))) + assert.Equal(t, `hello1`, readStr(<-client.Channel())) + + assert.NoError(t, client.UnsubscribePattern("topic/*")) + assert.NoError(t, br.broker.UnsubscribePattern(client.upstream, "topic/*")) + + assert.Equal(t, []string{}, client.Patterns()) +} diff --git a/plugins/broadcast/root/config.go b/plugins/broadcast/root/config.go new file mode 100644 index 00000000..8c732441 --- /dev/null +++ b/plugins/broadcast/root/config.go @@ -0,0 +1,61 @@ +package broadcast + +import ( + "errors" + + "github.com/go-redis/redis/v8" +) + +// Config configures the broadcast extension. +type Config struct { + // RedisConfig configures redis broker. + Redis *RedisConfig +} + +// Hydrate reads the configuration values from the source configuration. +//func (c *Config) Hydrate(cfg service.Config) error { +// if err := cfg.Unmarshal(c); err != nil { +// return err +// } +// +// if c.Redis != nil { +// return c.Redis.isValid() +// } +// +// return nil +//} + +// InitDefaults enables in memory broadcast configuration. +func (c *Config) InitDefaults() error { + return nil +} + +// RedisConfig configures redis broker. +type RedisConfig struct { + // Addr of the redis server. + Addr string + + // Password to redis server. + Password string + + // DB index. + DB int +} + +// clusterOptions +func (cfg *RedisConfig) redisClient() redis.UniversalClient { + return redis.NewClient(&redis.Options{ + Addr: cfg.Addr, + Password: cfg.Password, + PoolSize: 2, + }) +} + +// check if redis config is valid. +func (cfg *RedisConfig) isValid() error { + if cfg.Addr == "" { + return errors.New("redis addr is required") + } + + return nil +} diff --git a/plugins/broadcast/root/config_test.go b/plugins/broadcast/root/config_test.go new file mode 100644 index 00000000..28191c6b --- /dev/null +++ b/plugins/broadcast/root/config_test.go @@ -0,0 +1,60 @@ +package broadcast + +import ( + "encoding/json" + "testing" + + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/rpc" + "github.com/stretchr/testify/assert" +) + +type testCfg struct { + rpc string + broadcast string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == ID { + return &testCfg{target: cfg.broadcast} + } + + if name == rpc.ID { + return &testCfg{target: cfg.rpc} + } + + return nil +} + +func (cfg *testCfg) Unmarshal(out interface{}) error { + return json.Unmarshal([]byte(cfg.target), out) +} + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &testCfg{target: `{"dead`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_OK(t *testing.T) { + cfg := &testCfg{target: `{"path":"/path"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Redis_Error(t *testing.T) { + cfg := &testCfg{target: `{"path":"/path","redis":{}}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Redis_OK(t *testing.T) { + cfg := &testCfg{target: `{"path":"/path","redis":{"addr":"localhost:6379"}}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go new file mode 100644 index 00000000..91137f8b --- /dev/null +++ b/plugins/broadcast/root/router.go @@ -0,0 +1,170 @@ +package broadcast + +//import "github.com/gobwas/glob" + +// Router performs internal message routing to multiple subscribers. +type Router struct { + wildcard map[string]wildcard + routes map[string][]chan *Message +} + +// wildcard handles number of topics via glob pattern. +type wildcard struct { + //glob glob.Glob + upstream []chan *Message +} + +// helper for blocking join/leave flow +type subscriber struct { + upstream chan *Message + done chan error + topics []string + pattern string +} + +// NewRouter creates new topic and pattern router. +func NewRouter() *Router { + return &Router{ + wildcard: make(map[string]wildcard), + routes: make(map[string][]chan *Message), + } +} + +// Dispatch to all connected topics. +func (r *Router) Dispatch(msg *Message) { + for _, w := range r.wildcard { + if w.glob.Match(msg.Topic) { + for _, upstream := range w.upstream { + upstream <- msg + } + } + } + + if routes, ok := r.routes[msg.Topic]; ok { + for _, upstream := range routes { + upstream <- msg + } + } +} + +// Subscribe to topic and return list of newly assigned topics. +func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) { + newTopics = make([]string, 0) + for _, topic := range topics { + if _, ok := r.routes[topic]; !ok { + r.routes[topic] = []chan *Message{upstream} + if !r.collapsed(topic) { + newTopics = append(newTopics, topic) + } + continue + } + + joined := false + for _, up := range r.routes[topic] { + if up == upstream { + joined = true + break + } + } + + if !joined { + r.routes[topic] = append(r.routes[topic], upstream) + } + } + + return newTopics +} + +// Unsubscribe from given list of topics and return list of topics which are no longer claimed. +func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) { + dropTopics = make([]string, 0) + for _, topic := range topics { + if _, ok := r.routes[topic]; !ok { + // no such topic, ignore + continue + } + + for i := range r.routes[topic] { + if r.routes[topic][i] == upstream { + r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...) + break + } + } + + if len(r.routes[topic]) == 0 { + delete(r.routes, topic) + + // standalone empty subscription + if !r.collapsed(topic) { + dropTopics = append(dropTopics, topic) + } + } + } + + return dropTopics +} + +// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in +// case if blob is invalid. +func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) { + if w, ok := r.wildcard[pattern]; ok { + joined := false + for _, up := range w.upstream { + if up == upstream { + joined = true + break + } + } + + if !joined { + w.upstream = append(w.upstream, upstream) + } + + return nil, nil + } + + g, err := glob.Compile(pattern) + if err != nil { + return nil, err + } + + r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}} + + return []string{pattern}, nil +} + +// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed. +func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) { + // todo: store and return collapsed topics + + w, ok := r.wildcard[pattern] + if !ok { + // no such pattern + return nil + } + + for i, up := range w.upstream { + if up == upstream { + w.upstream[i] = w.upstream[len(w.upstream)-1] + w.upstream[len(w.upstream)-1] = nil + w.upstream = w.upstream[:len(w.upstream)-1] + + if len(w.upstream) == 0 { + delete(r.wildcard, pattern) + return []string{pattern} + } + } + } + + return nil +} + +func (r *Router) collapsed(topic string) bool { + for _, w := range r.wildcard { + if w.glob.Match(topic) { + return true + } + } + + return false +} diff --git a/plugins/broadcast/root/rpc.go b/plugins/broadcast/root/rpc.go new file mode 100644 index 00000000..5604a574 --- /dev/null +++ b/plugins/broadcast/root/rpc.go @@ -0,0 +1,25 @@ +package broadcast + +import "golang.org/x/sync/errgroup" + +type rpcService struct { + svc *Service +} + +// Publish Messages. +func (r *rpcService) Publish(msg []*Message, ok *bool) error { + *ok = true + return r.svc.Publish(msg...) +} + +// Publish Messages in async mode. Blocks until get an err or nil from publish +func (r *rpcService) PublishAsync(msg []*Message, ok *bool) error { + *ok = true + g := &errgroup.Group{} + + g.Go(func() error { + return r.svc.Publish(msg...) + }) + + return g.Wait() +} diff --git a/plugins/broadcast/root/rpc_test.go b/plugins/broadcast/root/rpc_test.go new file mode 100644 index 00000000..157c4e70 --- /dev/null +++ b/plugins/broadcast/root/rpc_test.go @@ -0,0 +1,72 @@ +package broadcast + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRPC_Broadcast(t *testing.T) { + br, rpc, c := setup(`{}`) + defer c.Stop() + + client := br.NewClient() + defer client.Close() + + rcpClient, err := rpc.Client() + assert.NoError(t, err) + + // must not be delivered + ok := false + assert.NoError(t, rcpClient.Call( + "broadcast.Publish", + []*Message{newMessage("topic", `"hello1"`)}, + &ok, + )) + assert.True(t, ok) + + assert.NoError(t, client.Subscribe("topic")) + + assert.NoError(t, rcpClient.Call( + "broadcast.Publish", + []*Message{newMessage("topic", `"hello1"`)}, + &ok, + )) + assert.True(t, ok) + assert.Equal(t, `"hello1"`, readStr(<-client.Channel())) + + assert.NoError(t, rcpClient.Call( + "broadcast.Publish", + []*Message{newMessage("topic", `"hello2"`)}, + &ok, + )) + assert.True(t, ok) + assert.Equal(t, `"hello2"`, readStr(<-client.Channel())) + + assert.NoError(t, client.Unsubscribe("topic")) + + assert.NoError(t, rcpClient.Call( + "broadcast.Publish", + []*Message{newMessage("topic", `"hello3"`)}, + &ok, + )) + assert.True(t, ok) + + assert.NoError(t, client.Subscribe("topic")) + + assert.NoError(t, rcpClient.Call( + "broadcast.Publish", + []*Message{newMessage("topic", `"hello4"`)}, + &ok, + )) + assert.True(t, ok) + assert.Equal(t, `"hello4"`, readStr(<-client.Channel())) + + assert.NoError(t, rcpClient.Call( + "broadcast.PublishAsync", + []*Message{newMessage("topic", `"hello5"`)}, + &ok, + )) + assert.True(t, ok) + assert.Equal(t, `"hello5"`, readStr(<-client.Channel())) +} diff --git a/plugins/broadcast/root/service.go b/plugins/broadcast/root/service.go new file mode 100644 index 00000000..8b175b3e --- /dev/null +++ b/plugins/broadcast/root/service.go @@ -0,0 +1,85 @@ +package broadcast + +import ( + "errors" + "sync" + + "github.com/spiral/roadrunner/service/rpc" +) + +// ID defines public service name. +const ID = "broadcast" + +// Service manages even broadcasting and websocket interface. +type Service struct { + // service and broker configuration + cfg *Config + + // broker + mu sync.Mutex + broker Broker +} + +// Init service. +func (s *Service) Init(cfg *Config, rpc *rpc.Service) (ok bool, err error) { + s.cfg = cfg + + if rpc != nil { + if err := rpc.Register(ID, &rpcService{svc: s}); err != nil { + return false, err + } + } + + s.mu.Lock() + if s.cfg.Redis != nil { + if s.broker, err = redisBroker(s.cfg.Redis); err != nil { + return false, err + } + } else { + s.broker = memoryBroker() + } + s.mu.Unlock() + + return true, nil +} + +// Serve broadcast broker. +func (s *Service) Serve() (err error) { + return s.broker.Serve() +} + +// Stop closes broadcast broker. +func (s *Service) Stop() { + broker := s.Broker() + if broker != nil { + broker.Stop() + } +} + +// Broker returns associated broker. +func (s *Service) Broker() Broker { + s.mu.Lock() + defer s.mu.Unlock() + + return s.broker +} + +// NewClient returns single connected client with ability to consume or produce into associated topic(svc). +func (s *Service) NewClient() *Client { + return &Client{ + upstream: make(chan *Message), + broker: s.Broker(), + topics: make([]string, 0), + patterns: make([]string, 0), + } +} + +// Publish one or multiple Channel. +func (s *Service) Publish(msg ...*Message) error { + broker := s.Broker() + if broker == nil { + return errors.New("no stopped broker") + } + + return s.Broker().Publish(msg...) +} diff --git a/plugins/broadcast/root/service_test.go b/plugins/broadcast/root/service_test.go new file mode 100644 index 00000000..10b924cc --- /dev/null +++ b/plugins/broadcast/root/service_test.go @@ -0,0 +1,65 @@ +package broadcast + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/rpc" + "github.com/stretchr/testify/assert" +) + +var rpcPort = 6010 + +func setup(cfg string) (*Service, *rpc.Service, service.Container) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rpc.ID, &rpc.Service{}) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{ + broadcast: cfg, + rpc: fmt.Sprintf(`{"listen":"tcp://:%v"}`, rpcPort), + }) + + rpcPort++ + + if err != nil { + panic(err) + } + + go func() { + err = c.Serve() + if err != nil { + panic(err) + } + }() + time.Sleep(time.Millisecond * 100) + + b, _ := c.Get(ID) + br := b.(*Service) + + r, _ := c.Get(rpc.ID) + rp := r.(*rpc.Service) + + return br, rp, c +} + +func readStr(m *Message) string { + return strings.TrimRight(string(m.Payload), "\n") +} + +func newMessage(t, m string) *Message { + return &Message{Topic: t, Payload: []byte(m)} +} + +func TestService_Publish(t *testing.T) { + svc := &Service{} + assert.Error(t, svc.Publish(nil)) +} diff --git a/plugins/broadcast/root/tests/.rr.yaml b/plugins/broadcast/root/tests/.rr.yaml new file mode 100644 index 00000000..c35a12fc --- /dev/null +++ b/plugins/broadcast/root/tests/.rr.yaml @@ -0,0 +1,2 @@ +broadcast: + redis.addr: "localhost:6379"
\ No newline at end of file diff --git a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php new file mode 100644 index 00000000..d6014bf0 --- /dev/null +++ b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php @@ -0,0 +1,56 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Broadcast\Tests; + +use PHPUnit\Framework\TestCase; +use Spiral\Broadcast\Broadcast; +use Spiral\Broadcast\Exception\BroadcastException; +use Spiral\Broadcast\Message; +use Spiral\Goridge\RPC; +use Spiral\Goridge\SocketRelay; + +class BroadcastTest extends TestCase +{ + public function testBroadcast(): void + { + $rpc = new RPC(new SocketRelay('localhost', 6001)); + $br = new Broadcast($rpc); + + $br->publish( + new Message('tests/topic', 'hello'), + new Message('tests/123', ['key' => 'value']) + ); + + while (filesize(__DIR__ . '/../log.txt') < 40) { + clearstatcache(true, __DIR__ . '/../log.txt'); + usleep(1000); + } + + clearstatcache(true, __DIR__ . '/../log.txt'); + $content = file_get_contents(__DIR__ . '/../log.txt'); + + $this->assertSame('tests/topic: "hello" +tests/123: {"key":"value"} +', $content); + } + + public function testBroadcastException(): void + { + $rpc = new RPC(new SocketRelay('localhost', 6002)); + $br = new Broadcast($rpc); + + $this->expectException(BroadcastException::class); + $br->publish( + new Message('topic', 'hello') + ); + } +} diff --git a/plugins/broadcast/root/tests/Broadcast/MessageTest.php b/plugins/broadcast/root/tests/Broadcast/MessageTest.php new file mode 100644 index 00000000..dd9e1cc3 --- /dev/null +++ b/plugins/broadcast/root/tests/Broadcast/MessageTest.php @@ -0,0 +1,24 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\Broadcast\Tests; + +use PHPUnit\Framework\TestCase; +use Spiral\Broadcast\Message; + +class MessageTest extends TestCase +{ + public function testSerialize(): void + { + $m = new Message('topic', ['hello' => 'world']); + $this->assertSame('{"topic":"topic","payload":{"hello":"world"}}', json_encode($m)); + } +} diff --git a/plugins/broadcast/root/tests/bootstrap.php b/plugins/broadcast/root/tests/bootstrap.php new file mode 100644 index 00000000..d0dfb88b --- /dev/null +++ b/plugins/broadcast/root/tests/bootstrap.php @@ -0,0 +1,15 @@ +<?php + +/** + * Spiral Framework, SpiralScout LLC. + * + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +error_reporting(E_ALL | E_STRICT); +ini_set('display_errors', 'stderr'); + +//Composer +require dirname(__DIR__) . '/vendor_php/autoload.php'; diff --git a/plugins/broadcast/root/tests/docker-compose.yml b/plugins/broadcast/root/tests/docker-compose.yml new file mode 100644 index 00000000..123aa9b9 --- /dev/null +++ b/plugins/broadcast/root/tests/docker-compose.yml @@ -0,0 +1,9 @@ +version: '3' + +services: + redis: + image: 'bitnami/redis:latest' + environment: + - ALLOW_EMPTY_PASSWORD=yes + ports: + - "6379:6379"
\ No newline at end of file diff --git a/plugins/broadcast/root/tests/go-client.go b/plugins/broadcast/root/tests/go-client.go new file mode 100644 index 00000000..21442a01 --- /dev/null +++ b/plugins/broadcast/root/tests/go-client.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "os" + + "github.com/spiral/broadcast/v2" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/service/rpc" + "golang.org/x/sync/errgroup" +) + +type logService struct { + broadcast *broadcast.Service + stop chan interface{} +} + +func (l *logService) Init(service *broadcast.Service) (bool, error) { + l.broadcast = service + + return true, nil +} + +func (l *logService) Serve() error { + l.stop = make(chan interface{}) + + client := l.broadcast.NewClient() + if err := client.SubscribePattern("tests/*"); err != nil { + return err + } + + logFile, _ := os.Create("log.txt") + + g := &errgroup.Group{} + g.Go(func() error { + for msg := range client.Channel() { + _, err := logFile.Write([]byte(fmt.Sprintf( + "%s: %s\n", + msg.Topic, + string(msg.Payload), + ))) + if err != nil { + return err + } + + err = logFile.Sync() + if err != nil { + return err + } + } + return nil + }) + + <-l.stop + err := logFile.Close() + if err != nil { + return err + } + + err = client.Close() + if err != nil { + return err + } + + return g.Wait() +} + +func (l *logService) Stop() { + close(l.stop) +} + +func main() { + rr.Container.Register(rpc.ID, &rpc.Service{}) + rr.Container.Register(broadcast.ID, &broadcast.Service{}) + rr.Container.Register("log", &logService{}) + + rr.Execute() +} |