diff options
Diffstat (limited to 'plugins/broadcast/root')
-rw-r--r-- | plugins/broadcast/root/Makefile | 9 | ||||
-rw-r--r-- | plugins/broadcast/root/broker.go | 36 | ||||
-rw-r--r-- | plugins/broadcast/root/client.go | 133 | ||||
-rw-r--r-- | plugins/broadcast/root/client_test.go | 59 | ||||
-rw-r--r-- | plugins/broadcast/root/config.go | 61 | ||||
-rw-r--r-- | plugins/broadcast/root/config_test.go | 60 | ||||
-rw-r--r-- | plugins/broadcast/root/router.go | 170 | ||||
-rw-r--r-- | plugins/broadcast/root/rpc.go | 25 | ||||
-rw-r--r-- | plugins/broadcast/root/rpc_test.go | 72 | ||||
-rw-r--r-- | plugins/broadcast/root/service.go | 85 | ||||
-rw-r--r-- | plugins/broadcast/root/service_test.go | 65 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/.rr.yaml | 2 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/Broadcast/BroadcastTest.php | 56 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/Broadcast/MessageTest.php | 24 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/bootstrap.php | 15 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/docker-compose.yml | 9 | ||||
-rw-r--r-- | plugins/broadcast/root/tests/go-client.go | 78 |
17 files changed, 0 insertions, 959 deletions
diff --git a/plugins/broadcast/root/Makefile b/plugins/broadcast/root/Makefile deleted file mode 100644 index d88312d2..00000000 --- a/plugins/broadcast/root/Makefile +++ /dev/null @@ -1,9 +0,0 @@ -clean: - rm -rf rr-jobbroadcast -install: all - cp rr-broadcast /usr/local/bin/rr-broadcast -uninstall: - rm -f /usr/local/bin/rr-broadcast -test: - composer update - go test -v -race -cover diff --git a/plugins/broadcast/root/broker.go b/plugins/broadcast/root/broker.go deleted file mode 100644 index 923c8105..00000000 --- a/plugins/broadcast/root/broker.go +++ /dev/null @@ -1,36 +0,0 @@ -package broadcast - -import "encoding/json" - -// Broker defines the ability to operate as message passing broker. -type Broker interface { - // Serve serves broker. - Serve() error - - // Stop closes the consumption and disconnects broker. - Stop() - - // Subscribe broker to one or multiple topics. - Subscribe(upstream chan *Message, topics ...string) error - - // SubscribePattern broker to pattern. - SubscribePattern(upstream chan *Message, pattern string) error - - // Unsubscribe broker from one or multiple topics. - Unsubscribe(upstream chan *Message, topics ...string) error - - // UnsubscribePattern broker from pattern. - UnsubscribePattern(upstream chan *Message, pattern string) error - - // Publish one or multiple Channel. - Publish(messages ...*Message) error -} - -// Message represent single message. -type Message struct { - // Topic message been pushed into. - Topic string `json:"topic"` - - // Payload to be broadcasted. Must be valid json when transferred over RPC. - Payload json.RawMessage `json:"payload"` -} diff --git a/plugins/broadcast/root/client.go b/plugins/broadcast/root/client.go deleted file mode 100644 index c5761f94..00000000 --- a/plugins/broadcast/root/client.go +++ /dev/null @@ -1,133 +0,0 @@ -package broadcast - -import "sync" - -// Client subscribes to a given topic and consumes or publish messages to it. -type Client struct { - upstream chan *Message - broker Broker - mu sync.Mutex - topics []string - patterns []string -} - -// Channel returns incoming messages channel. -func (c *Client) Channel() chan *Message { - return c.upstream -} - -// Publish message into associated topic or topics. -func (c *Client) Publish(msg ...*Message) error { - return c.broker.Publish(msg...) -} - -// Subscribe client to specific topics. -func (c *Client) Subscribe(topics ...string) error { - c.mu.Lock() - defer c.mu.Unlock() - - newTopics := make([]string, 0) - for _, topic := range topics { - found := false - for _, e := range c.topics { - if e == topic { - found = true - break - } - } - - if !found { - newTopics = append(newTopics, topic) - } - } - - if len(newTopics) == 0 { - return nil - } - - c.topics = append(c.topics, newTopics...) - - return c.broker.Subscribe(c.upstream, newTopics...) -} - -// SubscribePattern subscribe client to the specific topic pattern. -func (c *Client) SubscribePattern(pattern string) error { - c.mu.Lock() - defer c.mu.Unlock() - - for _, g := range c.patterns { - if g == pattern { - return nil - } - } - - c.patterns = append(c.patterns, pattern) - return c.broker.SubscribePattern(c.upstream, pattern) -} - -// Unsubscribe client from specific topics -func (c *Client) Unsubscribe(topics ...string) error { - c.mu.Lock() - defer c.mu.Unlock() - - dropTopics := make([]string, 0) - for _, topic := range topics { - for i, e := range c.topics { - if e == topic { - c.topics = append(c.topics[:i], c.topics[i+1:]...) - dropTopics = append(dropTopics, topic) - } - } - } - - if len(dropTopics) == 0 { - return nil - } - - return c.broker.Unsubscribe(c.upstream, dropTopics...) -} - -// UnsubscribePattern client from topic pattern. -func (c *Client) UnsubscribePattern(pattern string) error { - c.mu.Lock() - defer c.mu.Unlock() - - for i := range c.patterns { - if c.patterns[i] == pattern { - c.patterns = append(c.patterns[:i], c.patterns[i+1:]...) - - return c.broker.UnsubscribePattern(c.upstream, pattern) - } - } - - return nil -} - -// Topics return all the topics client subscribed to. -func (c *Client) Topics() []string { - c.mu.Lock() - defer c.mu.Unlock() - - return c.topics -} - -// Patterns return all the patterns client subscribed to. -func (c *Client) Patterns() []string { - c.mu.Lock() - defer c.mu.Unlock() - - return c.patterns -} - -// Close the client and consumption. -func (c *Client) Close() (err error) { - c.mu.Lock() - defer c.mu.Unlock() - - if len(c.topics) != 0 { - err = c.broker.Unsubscribe(c.upstream, c.topics...) - } - - close(c.upstream) - return err -} diff --git a/plugins/broadcast/root/client_test.go b/plugins/broadcast/root/client_test.go deleted file mode 100644 index 52a50d57..00000000 --- a/plugins/broadcast/root/client_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package broadcast - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_Client_Topics(t *testing.T) { - br, _, c := setup(`{}`) - defer c.Stop() - - client := br.NewClient() - defer client.Close() - - assert.Equal(t, []string{}, client.Topics()) - - assert.NoError(t, client.Subscribe("topic")) - assert.Equal(t, []string{"topic"}, client.Topics()) - - assert.NoError(t, client.Subscribe("topic")) - assert.Equal(t, []string{"topic"}, client.Topics()) - - assert.NoError(t, br.broker.Subscribe(client.upstream, "topic")) - assert.Equal(t, []string{"topic"}, client.Topics()) - - assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1"))) - assert.Equal(t, `hello1`, readStr(<-client.Channel())) - - assert.NoError(t, client.Unsubscribe("topic")) - assert.NoError(t, client.Unsubscribe("topic")) - assert.NoError(t, br.broker.Unsubscribe(client.upstream, "topic")) - - assert.Equal(t, []string{}, client.Topics()) -} - -func Test_Client_Patterns(t *testing.T) { - br, _, c := setup(`{}`) - defer c.Stop() - - client := br.NewClient() - defer client.Close() - - assert.Equal(t, []string{}, client.Patterns()) - - assert.NoError(t, client.SubscribePattern("topic/*")) - assert.Equal(t, []string{"topic/*"}, client.Patterns()) - - assert.NoError(t, br.broker.SubscribePattern(client.upstream, "topic/*")) - assert.Equal(t, []string{"topic/*"}, client.Patterns()) - - assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1"))) - assert.Equal(t, `hello1`, readStr(<-client.Channel())) - - assert.NoError(t, client.UnsubscribePattern("topic/*")) - assert.NoError(t, br.broker.UnsubscribePattern(client.upstream, "topic/*")) - - assert.Equal(t, []string{}, client.Patterns()) -} diff --git a/plugins/broadcast/root/config.go b/plugins/broadcast/root/config.go deleted file mode 100644 index 8c732441..00000000 --- a/plugins/broadcast/root/config.go +++ /dev/null @@ -1,61 +0,0 @@ -package broadcast - -import ( - "errors" - - "github.com/go-redis/redis/v8" -) - -// Config configures the broadcast extension. -type Config struct { - // RedisConfig configures redis broker. - Redis *RedisConfig -} - -// Hydrate reads the configuration values from the source configuration. -//func (c *Config) Hydrate(cfg service.Config) error { -// if err := cfg.Unmarshal(c); err != nil { -// return err -// } -// -// if c.Redis != nil { -// return c.Redis.isValid() -// } -// -// return nil -//} - -// InitDefaults enables in memory broadcast configuration. -func (c *Config) InitDefaults() error { - return nil -} - -// RedisConfig configures redis broker. -type RedisConfig struct { - // Addr of the redis server. - Addr string - - // Password to redis server. - Password string - - // DB index. - DB int -} - -// clusterOptions -func (cfg *RedisConfig) redisClient() redis.UniversalClient { - return redis.NewClient(&redis.Options{ - Addr: cfg.Addr, - Password: cfg.Password, - PoolSize: 2, - }) -} - -// check if redis config is valid. -func (cfg *RedisConfig) isValid() error { - if cfg.Addr == "" { - return errors.New("redis addr is required") - } - - return nil -} diff --git a/plugins/broadcast/root/config_test.go b/plugins/broadcast/root/config_test.go deleted file mode 100644 index 28191c6b..00000000 --- a/plugins/broadcast/root/config_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package broadcast - -import ( - "encoding/json" - "testing" - - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/rpc" - "github.com/stretchr/testify/assert" -) - -type testCfg struct { - rpc string - broadcast string - target string -} - -func (cfg *testCfg) Get(name string) service.Config { - if name == ID { - return &testCfg{target: cfg.broadcast} - } - - if name == rpc.ID { - return &testCfg{target: cfg.rpc} - } - - return nil -} - -func (cfg *testCfg) Unmarshal(out interface{}) error { - return json.Unmarshal([]byte(cfg.target), out) -} - -func Test_Config_Hydrate_Error(t *testing.T) { - cfg := &testCfg{target: `{"dead`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_OK(t *testing.T) { - cfg := &testCfg{target: `{"path":"/path"}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} - -func Test_Config_Redis_Error(t *testing.T) { - cfg := &testCfg{target: `{"path":"/path","redis":{}}`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Redis_OK(t *testing.T) { - cfg := &testCfg{target: `{"path":"/path","redis":{"addr":"localhost:6379"}}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go deleted file mode 100644 index 91137f8b..00000000 --- a/plugins/broadcast/root/router.go +++ /dev/null @@ -1,170 +0,0 @@ -package broadcast - -//import "github.com/gobwas/glob" - -// Router performs internal message routing to multiple subscribers. -type Router struct { - wildcard map[string]wildcard - routes map[string][]chan *Message -} - -// wildcard handles number of topics via glob pattern. -type wildcard struct { - //glob glob.Glob - upstream []chan *Message -} - -// helper for blocking join/leave flow -type subscriber struct { - upstream chan *Message - done chan error - topics []string - pattern string -} - -// NewRouter creates new topic and pattern router. -func NewRouter() *Router { - return &Router{ - wildcard: make(map[string]wildcard), - routes: make(map[string][]chan *Message), - } -} - -// Dispatch to all connected topics. -func (r *Router) Dispatch(msg *Message) { - for _, w := range r.wildcard { - if w.glob.Match(msg.Topic) { - for _, upstream := range w.upstream { - upstream <- msg - } - } - } - - if routes, ok := r.routes[msg.Topic]; ok { - for _, upstream := range routes { - upstream <- msg - } - } -} - -// Subscribe to topic and return list of newly assigned topics. -func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) { - newTopics = make([]string, 0) - for _, topic := range topics { - if _, ok := r.routes[topic]; !ok { - r.routes[topic] = []chan *Message{upstream} - if !r.collapsed(topic) { - newTopics = append(newTopics, topic) - } - continue - } - - joined := false - for _, up := range r.routes[topic] { - if up == upstream { - joined = true - break - } - } - - if !joined { - r.routes[topic] = append(r.routes[topic], upstream) - } - } - - return newTopics -} - -// Unsubscribe from given list of topics and return list of topics which are no longer claimed. -func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) { - dropTopics = make([]string, 0) - for _, topic := range topics { - if _, ok := r.routes[topic]; !ok { - // no such topic, ignore - continue - } - - for i := range r.routes[topic] { - if r.routes[topic][i] == upstream { - r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...) - break - } - } - - if len(r.routes[topic]) == 0 { - delete(r.routes, topic) - - // standalone empty subscription - if !r.collapsed(topic) { - dropTopics = append(dropTopics, topic) - } - } - } - - return dropTopics -} - -// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in -// case if blob is invalid. -func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) { - if w, ok := r.wildcard[pattern]; ok { - joined := false - for _, up := range w.upstream { - if up == upstream { - joined = true - break - } - } - - if !joined { - w.upstream = append(w.upstream, upstream) - } - - return nil, nil - } - - g, err := glob.Compile(pattern) - if err != nil { - return nil, err - } - - r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}} - - return []string{pattern}, nil -} - -// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed. -func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) { - // todo: store and return collapsed topics - - w, ok := r.wildcard[pattern] - if !ok { - // no such pattern - return nil - } - - for i, up := range w.upstream { - if up == upstream { - w.upstream[i] = w.upstream[len(w.upstream)-1] - w.upstream[len(w.upstream)-1] = nil - w.upstream = w.upstream[:len(w.upstream)-1] - - if len(w.upstream) == 0 { - delete(r.wildcard, pattern) - return []string{pattern} - } - } - } - - return nil -} - -func (r *Router) collapsed(topic string) bool { - for _, w := range r.wildcard { - if w.glob.Match(topic) { - return true - } - } - - return false -} diff --git a/plugins/broadcast/root/rpc.go b/plugins/broadcast/root/rpc.go deleted file mode 100644 index 5604a574..00000000 --- a/plugins/broadcast/root/rpc.go +++ /dev/null @@ -1,25 +0,0 @@ -package broadcast - -import "golang.org/x/sync/errgroup" - -type rpcService struct { - svc *Service -} - -// Publish Messages. -func (r *rpcService) Publish(msg []*Message, ok *bool) error { - *ok = true - return r.svc.Publish(msg...) -} - -// Publish Messages in async mode. Blocks until get an err or nil from publish -func (r *rpcService) PublishAsync(msg []*Message, ok *bool) error { - *ok = true - g := &errgroup.Group{} - - g.Go(func() error { - return r.svc.Publish(msg...) - }) - - return g.Wait() -} diff --git a/plugins/broadcast/root/rpc_test.go b/plugins/broadcast/root/rpc_test.go deleted file mode 100644 index 157c4e70..00000000 --- a/plugins/broadcast/root/rpc_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package broadcast - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRPC_Broadcast(t *testing.T) { - br, rpc, c := setup(`{}`) - defer c.Stop() - - client := br.NewClient() - defer client.Close() - - rcpClient, err := rpc.Client() - assert.NoError(t, err) - - // must not be delivered - ok := false - assert.NoError(t, rcpClient.Call( - "broadcast.Publish", - []*Message{newMessage("topic", `"hello1"`)}, - &ok, - )) - assert.True(t, ok) - - assert.NoError(t, client.Subscribe("topic")) - - assert.NoError(t, rcpClient.Call( - "broadcast.Publish", - []*Message{newMessage("topic", `"hello1"`)}, - &ok, - )) - assert.True(t, ok) - assert.Equal(t, `"hello1"`, readStr(<-client.Channel())) - - assert.NoError(t, rcpClient.Call( - "broadcast.Publish", - []*Message{newMessage("topic", `"hello2"`)}, - &ok, - )) - assert.True(t, ok) - assert.Equal(t, `"hello2"`, readStr(<-client.Channel())) - - assert.NoError(t, client.Unsubscribe("topic")) - - assert.NoError(t, rcpClient.Call( - "broadcast.Publish", - []*Message{newMessage("topic", `"hello3"`)}, - &ok, - )) - assert.True(t, ok) - - assert.NoError(t, client.Subscribe("topic")) - - assert.NoError(t, rcpClient.Call( - "broadcast.Publish", - []*Message{newMessage("topic", `"hello4"`)}, - &ok, - )) - assert.True(t, ok) - assert.Equal(t, `"hello4"`, readStr(<-client.Channel())) - - assert.NoError(t, rcpClient.Call( - "broadcast.PublishAsync", - []*Message{newMessage("topic", `"hello5"`)}, - &ok, - )) - assert.True(t, ok) - assert.Equal(t, `"hello5"`, readStr(<-client.Channel())) -} diff --git a/plugins/broadcast/root/service.go b/plugins/broadcast/root/service.go deleted file mode 100644 index 8b175b3e..00000000 --- a/plugins/broadcast/root/service.go +++ /dev/null @@ -1,85 +0,0 @@ -package broadcast - -import ( - "errors" - "sync" - - "github.com/spiral/roadrunner/service/rpc" -) - -// ID defines public service name. -const ID = "broadcast" - -// Service manages even broadcasting and websocket interface. -type Service struct { - // service and broker configuration - cfg *Config - - // broker - mu sync.Mutex - broker Broker -} - -// Init service. -func (s *Service) Init(cfg *Config, rpc *rpc.Service) (ok bool, err error) { - s.cfg = cfg - - if rpc != nil { - if err := rpc.Register(ID, &rpcService{svc: s}); err != nil { - return false, err - } - } - - s.mu.Lock() - if s.cfg.Redis != nil { - if s.broker, err = redisBroker(s.cfg.Redis); err != nil { - return false, err - } - } else { - s.broker = memoryBroker() - } - s.mu.Unlock() - - return true, nil -} - -// Serve broadcast broker. -func (s *Service) Serve() (err error) { - return s.broker.Serve() -} - -// Stop closes broadcast broker. -func (s *Service) Stop() { - broker := s.Broker() - if broker != nil { - broker.Stop() - } -} - -// Broker returns associated broker. -func (s *Service) Broker() Broker { - s.mu.Lock() - defer s.mu.Unlock() - - return s.broker -} - -// NewClient returns single connected client with ability to consume or produce into associated topic(svc). -func (s *Service) NewClient() *Client { - return &Client{ - upstream: make(chan *Message), - broker: s.Broker(), - topics: make([]string, 0), - patterns: make([]string, 0), - } -} - -// Publish one or multiple Channel. -func (s *Service) Publish(msg ...*Message) error { - broker := s.Broker() - if broker == nil { - return errors.New("no stopped broker") - } - - return s.Broker().Publish(msg...) -} diff --git a/plugins/broadcast/root/service_test.go b/plugins/broadcast/root/service_test.go deleted file mode 100644 index 10b924cc..00000000 --- a/plugins/broadcast/root/service_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package broadcast - -import ( - "fmt" - "strings" - "testing" - "time" - - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/rpc" - "github.com/stretchr/testify/assert" -) - -var rpcPort = 6010 - -func setup(cfg string) (*Service, *rpc.Service, service.Container) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rpc.ID, &rpc.Service{}) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{ - broadcast: cfg, - rpc: fmt.Sprintf(`{"listen":"tcp://:%v"}`, rpcPort), - }) - - rpcPort++ - - if err != nil { - panic(err) - } - - go func() { - err = c.Serve() - if err != nil { - panic(err) - } - }() - time.Sleep(time.Millisecond * 100) - - b, _ := c.Get(ID) - br := b.(*Service) - - r, _ := c.Get(rpc.ID) - rp := r.(*rpc.Service) - - return br, rp, c -} - -func readStr(m *Message) string { - return strings.TrimRight(string(m.Payload), "\n") -} - -func newMessage(t, m string) *Message { - return &Message{Topic: t, Payload: []byte(m)} -} - -func TestService_Publish(t *testing.T) { - svc := &Service{} - assert.Error(t, svc.Publish(nil)) -} diff --git a/plugins/broadcast/root/tests/.rr.yaml b/plugins/broadcast/root/tests/.rr.yaml deleted file mode 100644 index c35a12fc..00000000 --- a/plugins/broadcast/root/tests/.rr.yaml +++ /dev/null @@ -1,2 +0,0 @@ -broadcast: - redis.addr: "localhost:6379"
\ No newline at end of file diff --git a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php deleted file mode 100644 index d6014bf0..00000000 --- a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php +++ /dev/null @@ -1,56 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ - -declare(strict_types=1); - -namespace Spiral\Broadcast\Tests; - -use PHPUnit\Framework\TestCase; -use Spiral\Broadcast\Broadcast; -use Spiral\Broadcast\Exception\BroadcastException; -use Spiral\Broadcast\Message; -use Spiral\Goridge\RPC; -use Spiral\Goridge\SocketRelay; - -class BroadcastTest extends TestCase -{ - public function testBroadcast(): void - { - $rpc = new RPC(new SocketRelay('localhost', 6001)); - $br = new Broadcast($rpc); - - $br->publish( - new Message('tests/topic', 'hello'), - new Message('tests/123', ['key' => 'value']) - ); - - while (filesize(__DIR__ . '/../log.txt') < 40) { - clearstatcache(true, __DIR__ . '/../log.txt'); - usleep(1000); - } - - clearstatcache(true, __DIR__ . '/../log.txt'); - $content = file_get_contents(__DIR__ . '/../log.txt'); - - $this->assertSame('tests/topic: "hello" -tests/123: {"key":"value"} -', $content); - } - - public function testBroadcastException(): void - { - $rpc = new RPC(new SocketRelay('localhost', 6002)); - $br = new Broadcast($rpc); - - $this->expectException(BroadcastException::class); - $br->publish( - new Message('topic', 'hello') - ); - } -} diff --git a/plugins/broadcast/root/tests/Broadcast/MessageTest.php b/plugins/broadcast/root/tests/Broadcast/MessageTest.php deleted file mode 100644 index dd9e1cc3..00000000 --- a/plugins/broadcast/root/tests/Broadcast/MessageTest.php +++ /dev/null @@ -1,24 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ - -declare(strict_types=1); - -namespace Spiral\Broadcast\Tests; - -use PHPUnit\Framework\TestCase; -use Spiral\Broadcast\Message; - -class MessageTest extends TestCase -{ - public function testSerialize(): void - { - $m = new Message('topic', ['hello' => 'world']); - $this->assertSame('{"topic":"topic","payload":{"hello":"world"}}', json_encode($m)); - } -} diff --git a/plugins/broadcast/root/tests/bootstrap.php b/plugins/broadcast/root/tests/bootstrap.php deleted file mode 100644 index d0dfb88b..00000000 --- a/plugins/broadcast/root/tests/bootstrap.php +++ /dev/null @@ -1,15 +0,0 @@ -<?php - -/** - * Spiral Framework, SpiralScout LLC. - * - * @author Anton Titov (Wolfy-J) - */ - -declare(strict_types=1); - -error_reporting(E_ALL | E_STRICT); -ini_set('display_errors', 'stderr'); - -//Composer -require dirname(__DIR__) . '/vendor_php/autoload.php'; diff --git a/plugins/broadcast/root/tests/docker-compose.yml b/plugins/broadcast/root/tests/docker-compose.yml deleted file mode 100644 index 123aa9b9..00000000 --- a/plugins/broadcast/root/tests/docker-compose.yml +++ /dev/null @@ -1,9 +0,0 @@ -version: '3' - -services: - redis: - image: 'bitnami/redis:latest' - environment: - - ALLOW_EMPTY_PASSWORD=yes - ports: - - "6379:6379"
\ No newline at end of file diff --git a/plugins/broadcast/root/tests/go-client.go b/plugins/broadcast/root/tests/go-client.go deleted file mode 100644 index 21442a01..00000000 --- a/plugins/broadcast/root/tests/go-client.go +++ /dev/null @@ -1,78 +0,0 @@ -package main - -import ( - "fmt" - "os" - - "github.com/spiral/broadcast/v2" - rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/service/rpc" - "golang.org/x/sync/errgroup" -) - -type logService struct { - broadcast *broadcast.Service - stop chan interface{} -} - -func (l *logService) Init(service *broadcast.Service) (bool, error) { - l.broadcast = service - - return true, nil -} - -func (l *logService) Serve() error { - l.stop = make(chan interface{}) - - client := l.broadcast.NewClient() - if err := client.SubscribePattern("tests/*"); err != nil { - return err - } - - logFile, _ := os.Create("log.txt") - - g := &errgroup.Group{} - g.Go(func() error { - for msg := range client.Channel() { - _, err := logFile.Write([]byte(fmt.Sprintf( - "%s: %s\n", - msg.Topic, - string(msg.Payload), - ))) - if err != nil { - return err - } - - err = logFile.Sync() - if err != nil { - return err - } - } - return nil - }) - - <-l.stop - err := logFile.Close() - if err != nil { - return err - } - - err = client.Close() - if err != nil { - return err - } - - return g.Wait() -} - -func (l *logService) Stop() { - close(l.stop) -} - -func main() { - rr.Container.Register(rpc.ID, &rpc.Service{}) - rr.Container.Register(broadcast.ID, &broadcast.Service{}) - rr.Container.Register("log", &logService{}) - - rr.Execute() -} |