summaryrefslogtreecommitdiff
path: root/plugins/broadcast/root
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast/root')
-rw-r--r--plugins/broadcast/root/Makefile9
-rw-r--r--plugins/broadcast/root/broker.go36
-rw-r--r--plugins/broadcast/root/client.go133
-rw-r--r--plugins/broadcast/root/client_test.go59
-rw-r--r--plugins/broadcast/root/config.go61
-rw-r--r--plugins/broadcast/root/config_test.go60
-rw-r--r--plugins/broadcast/root/router.go170
-rw-r--r--plugins/broadcast/root/rpc.go25
-rw-r--r--plugins/broadcast/root/rpc_test.go72
-rw-r--r--plugins/broadcast/root/service.go85
-rw-r--r--plugins/broadcast/root/service_test.go65
-rw-r--r--plugins/broadcast/root/tests/.rr.yaml2
-rw-r--r--plugins/broadcast/root/tests/Broadcast/BroadcastTest.php56
-rw-r--r--plugins/broadcast/root/tests/Broadcast/MessageTest.php24
-rw-r--r--plugins/broadcast/root/tests/bootstrap.php15
-rw-r--r--plugins/broadcast/root/tests/docker-compose.yml9
-rw-r--r--plugins/broadcast/root/tests/go-client.go78
17 files changed, 0 insertions, 959 deletions
diff --git a/plugins/broadcast/root/Makefile b/plugins/broadcast/root/Makefile
deleted file mode 100644
index d88312d2..00000000
--- a/plugins/broadcast/root/Makefile
+++ /dev/null
@@ -1,9 +0,0 @@
-clean:
- rm -rf rr-jobbroadcast
-install: all
- cp rr-broadcast /usr/local/bin/rr-broadcast
-uninstall:
- rm -f /usr/local/bin/rr-broadcast
-test:
- composer update
- go test -v -race -cover
diff --git a/plugins/broadcast/root/broker.go b/plugins/broadcast/root/broker.go
deleted file mode 100644
index 923c8105..00000000
--- a/plugins/broadcast/root/broker.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package broadcast
-
-import "encoding/json"
-
-// Broker defines the ability to operate as message passing broker.
-type Broker interface {
- // Serve serves broker.
- Serve() error
-
- // Stop closes the consumption and disconnects broker.
- Stop()
-
- // Subscribe broker to one or multiple topics.
- Subscribe(upstream chan *Message, topics ...string) error
-
- // SubscribePattern broker to pattern.
- SubscribePattern(upstream chan *Message, pattern string) error
-
- // Unsubscribe broker from one or multiple topics.
- Unsubscribe(upstream chan *Message, topics ...string) error
-
- // UnsubscribePattern broker from pattern.
- UnsubscribePattern(upstream chan *Message, pattern string) error
-
- // Publish one or multiple Channel.
- Publish(messages ...*Message) error
-}
-
-// Message represent single message.
-type Message struct {
- // Topic message been pushed into.
- Topic string `json:"topic"`
-
- // Payload to be broadcasted. Must be valid json when transferred over RPC.
- Payload json.RawMessage `json:"payload"`
-}
diff --git a/plugins/broadcast/root/client.go b/plugins/broadcast/root/client.go
deleted file mode 100644
index c5761f94..00000000
--- a/plugins/broadcast/root/client.go
+++ /dev/null
@@ -1,133 +0,0 @@
-package broadcast
-
-import "sync"
-
-// Client subscribes to a given topic and consumes or publish messages to it.
-type Client struct {
- upstream chan *Message
- broker Broker
- mu sync.Mutex
- topics []string
- patterns []string
-}
-
-// Channel returns incoming messages channel.
-func (c *Client) Channel() chan *Message {
- return c.upstream
-}
-
-// Publish message into associated topic or topics.
-func (c *Client) Publish(msg ...*Message) error {
- return c.broker.Publish(msg...)
-}
-
-// Subscribe client to specific topics.
-func (c *Client) Subscribe(topics ...string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- newTopics := make([]string, 0)
- for _, topic := range topics {
- found := false
- for _, e := range c.topics {
- if e == topic {
- found = true
- break
- }
- }
-
- if !found {
- newTopics = append(newTopics, topic)
- }
- }
-
- if len(newTopics) == 0 {
- return nil
- }
-
- c.topics = append(c.topics, newTopics...)
-
- return c.broker.Subscribe(c.upstream, newTopics...)
-}
-
-// SubscribePattern subscribe client to the specific topic pattern.
-func (c *Client) SubscribePattern(pattern string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- for _, g := range c.patterns {
- if g == pattern {
- return nil
- }
- }
-
- c.patterns = append(c.patterns, pattern)
- return c.broker.SubscribePattern(c.upstream, pattern)
-}
-
-// Unsubscribe client from specific topics
-func (c *Client) Unsubscribe(topics ...string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- dropTopics := make([]string, 0)
- for _, topic := range topics {
- for i, e := range c.topics {
- if e == topic {
- c.topics = append(c.topics[:i], c.topics[i+1:]...)
- dropTopics = append(dropTopics, topic)
- }
- }
- }
-
- if len(dropTopics) == 0 {
- return nil
- }
-
- return c.broker.Unsubscribe(c.upstream, dropTopics...)
-}
-
-// UnsubscribePattern client from topic pattern.
-func (c *Client) UnsubscribePattern(pattern string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- for i := range c.patterns {
- if c.patterns[i] == pattern {
- c.patterns = append(c.patterns[:i], c.patterns[i+1:]...)
-
- return c.broker.UnsubscribePattern(c.upstream, pattern)
- }
- }
-
- return nil
-}
-
-// Topics return all the topics client subscribed to.
-func (c *Client) Topics() []string {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- return c.topics
-}
-
-// Patterns return all the patterns client subscribed to.
-func (c *Client) Patterns() []string {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- return c.patterns
-}
-
-// Close the client and consumption.
-func (c *Client) Close() (err error) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if len(c.topics) != 0 {
- err = c.broker.Unsubscribe(c.upstream, c.topics...)
- }
-
- close(c.upstream)
- return err
-}
diff --git a/plugins/broadcast/root/client_test.go b/plugins/broadcast/root/client_test.go
deleted file mode 100644
index 52a50d57..00000000
--- a/plugins/broadcast/root/client_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package broadcast
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_Client_Topics(t *testing.T) {
- br, _, c := setup(`{}`)
- defer c.Stop()
-
- client := br.NewClient()
- defer client.Close()
-
- assert.Equal(t, []string{}, client.Topics())
-
- assert.NoError(t, client.Subscribe("topic"))
- assert.Equal(t, []string{"topic"}, client.Topics())
-
- assert.NoError(t, client.Subscribe("topic"))
- assert.Equal(t, []string{"topic"}, client.Topics())
-
- assert.NoError(t, br.broker.Subscribe(client.upstream, "topic"))
- assert.Equal(t, []string{"topic"}, client.Topics())
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic", "hello1")))
- assert.Equal(t, `hello1`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.Unsubscribe("topic"))
- assert.NoError(t, client.Unsubscribe("topic"))
- assert.NoError(t, br.broker.Unsubscribe(client.upstream, "topic"))
-
- assert.Equal(t, []string{}, client.Topics())
-}
-
-func Test_Client_Patterns(t *testing.T) {
- br, _, c := setup(`{}`)
- defer c.Stop()
-
- client := br.NewClient()
- defer client.Close()
-
- assert.Equal(t, []string{}, client.Patterns())
-
- assert.NoError(t, client.SubscribePattern("topic/*"))
- assert.Equal(t, []string{"topic/*"}, client.Patterns())
-
- assert.NoError(t, br.broker.SubscribePattern(client.upstream, "topic/*"))
- assert.Equal(t, []string{"topic/*"}, client.Patterns())
-
- assert.NoError(t, br.Broker().Publish(newMessage("topic/1", "hello1")))
- assert.Equal(t, `hello1`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.UnsubscribePattern("topic/*"))
- assert.NoError(t, br.broker.UnsubscribePattern(client.upstream, "topic/*"))
-
- assert.Equal(t, []string{}, client.Patterns())
-}
diff --git a/plugins/broadcast/root/config.go b/plugins/broadcast/root/config.go
deleted file mode 100644
index 8c732441..00000000
--- a/plugins/broadcast/root/config.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package broadcast
-
-import (
- "errors"
-
- "github.com/go-redis/redis/v8"
-)
-
-// Config configures the broadcast extension.
-type Config struct {
- // RedisConfig configures redis broker.
- Redis *RedisConfig
-}
-
-// Hydrate reads the configuration values from the source configuration.
-//func (c *Config) Hydrate(cfg service.Config) error {
-// if err := cfg.Unmarshal(c); err != nil {
-// return err
-// }
-//
-// if c.Redis != nil {
-// return c.Redis.isValid()
-// }
-//
-// return nil
-//}
-
-// InitDefaults enables in memory broadcast configuration.
-func (c *Config) InitDefaults() error {
- return nil
-}
-
-// RedisConfig configures redis broker.
-type RedisConfig struct {
- // Addr of the redis server.
- Addr string
-
- // Password to redis server.
- Password string
-
- // DB index.
- DB int
-}
-
-// clusterOptions
-func (cfg *RedisConfig) redisClient() redis.UniversalClient {
- return redis.NewClient(&redis.Options{
- Addr: cfg.Addr,
- Password: cfg.Password,
- PoolSize: 2,
- })
-}
-
-// check if redis config is valid.
-func (cfg *RedisConfig) isValid() error {
- if cfg.Addr == "" {
- return errors.New("redis addr is required")
- }
-
- return nil
-}
diff --git a/plugins/broadcast/root/config_test.go b/plugins/broadcast/root/config_test.go
deleted file mode 100644
index 28191c6b..00000000
--- a/plugins/broadcast/root/config_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package broadcast
-
-import (
- "encoding/json"
- "testing"
-
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/rpc"
- "github.com/stretchr/testify/assert"
-)
-
-type testCfg struct {
- rpc string
- broadcast string
- target string
-}
-
-func (cfg *testCfg) Get(name string) service.Config {
- if name == ID {
- return &testCfg{target: cfg.broadcast}
- }
-
- if name == rpc.ID {
- return &testCfg{target: cfg.rpc}
- }
-
- return nil
-}
-
-func (cfg *testCfg) Unmarshal(out interface{}) error {
- return json.Unmarshal([]byte(cfg.target), out)
-}
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &testCfg{target: `{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_OK(t *testing.T) {
- cfg := &testCfg{target: `{"path":"/path"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Redis_Error(t *testing.T) {
- cfg := &testCfg{target: `{"path":"/path","redis":{}}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Redis_OK(t *testing.T) {
- cfg := &testCfg{target: `{"path":"/path","redis":{"addr":"localhost:6379"}}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go
deleted file mode 100644
index 91137f8b..00000000
--- a/plugins/broadcast/root/router.go
+++ /dev/null
@@ -1,170 +0,0 @@
-package broadcast
-
-//import "github.com/gobwas/glob"
-
-// Router performs internal message routing to multiple subscribers.
-type Router struct {
- wildcard map[string]wildcard
- routes map[string][]chan *Message
-}
-
-// wildcard handles number of topics via glob pattern.
-type wildcard struct {
- //glob glob.Glob
- upstream []chan *Message
-}
-
-// helper for blocking join/leave flow
-type subscriber struct {
- upstream chan *Message
- done chan error
- topics []string
- pattern string
-}
-
-// NewRouter creates new topic and pattern router.
-func NewRouter() *Router {
- return &Router{
- wildcard: make(map[string]wildcard),
- routes: make(map[string][]chan *Message),
- }
-}
-
-// Dispatch to all connected topics.
-func (r *Router) Dispatch(msg *Message) {
- for _, w := range r.wildcard {
- if w.glob.Match(msg.Topic) {
- for _, upstream := range w.upstream {
- upstream <- msg
- }
- }
- }
-
- if routes, ok := r.routes[msg.Topic]; ok {
- for _, upstream := range routes {
- upstream <- msg
- }
- }
-}
-
-// Subscribe to topic and return list of newly assigned topics.
-func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) {
- newTopics = make([]string, 0)
- for _, topic := range topics {
- if _, ok := r.routes[topic]; !ok {
- r.routes[topic] = []chan *Message{upstream}
- if !r.collapsed(topic) {
- newTopics = append(newTopics, topic)
- }
- continue
- }
-
- joined := false
- for _, up := range r.routes[topic] {
- if up == upstream {
- joined = true
- break
- }
- }
-
- if !joined {
- r.routes[topic] = append(r.routes[topic], upstream)
- }
- }
-
- return newTopics
-}
-
-// Unsubscribe from given list of topics and return list of topics which are no longer claimed.
-func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) {
- dropTopics = make([]string, 0)
- for _, topic := range topics {
- if _, ok := r.routes[topic]; !ok {
- // no such topic, ignore
- continue
- }
-
- for i := range r.routes[topic] {
- if r.routes[topic][i] == upstream {
- r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...)
- break
- }
- }
-
- if len(r.routes[topic]) == 0 {
- delete(r.routes, topic)
-
- // standalone empty subscription
- if !r.collapsed(topic) {
- dropTopics = append(dropTopics, topic)
- }
- }
- }
-
- return dropTopics
-}
-
-// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in
-// case if blob is invalid.
-func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) {
- if w, ok := r.wildcard[pattern]; ok {
- joined := false
- for _, up := range w.upstream {
- if up == upstream {
- joined = true
- break
- }
- }
-
- if !joined {
- w.upstream = append(w.upstream, upstream)
- }
-
- return nil, nil
- }
-
- g, err := glob.Compile(pattern)
- if err != nil {
- return nil, err
- }
-
- r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}}
-
- return []string{pattern}, nil
-}
-
-// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed.
-func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) {
- // todo: store and return collapsed topics
-
- w, ok := r.wildcard[pattern]
- if !ok {
- // no such pattern
- return nil
- }
-
- for i, up := range w.upstream {
- if up == upstream {
- w.upstream[i] = w.upstream[len(w.upstream)-1]
- w.upstream[len(w.upstream)-1] = nil
- w.upstream = w.upstream[:len(w.upstream)-1]
-
- if len(w.upstream) == 0 {
- delete(r.wildcard, pattern)
- return []string{pattern}
- }
- }
- }
-
- return nil
-}
-
-func (r *Router) collapsed(topic string) bool {
- for _, w := range r.wildcard {
- if w.glob.Match(topic) {
- return true
- }
- }
-
- return false
-}
diff --git a/plugins/broadcast/root/rpc.go b/plugins/broadcast/root/rpc.go
deleted file mode 100644
index 5604a574..00000000
--- a/plugins/broadcast/root/rpc.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package broadcast
-
-import "golang.org/x/sync/errgroup"
-
-type rpcService struct {
- svc *Service
-}
-
-// Publish Messages.
-func (r *rpcService) Publish(msg []*Message, ok *bool) error {
- *ok = true
- return r.svc.Publish(msg...)
-}
-
-// Publish Messages in async mode. Blocks until get an err or nil from publish
-func (r *rpcService) PublishAsync(msg []*Message, ok *bool) error {
- *ok = true
- g := &errgroup.Group{}
-
- g.Go(func() error {
- return r.svc.Publish(msg...)
- })
-
- return g.Wait()
-}
diff --git a/plugins/broadcast/root/rpc_test.go b/plugins/broadcast/root/rpc_test.go
deleted file mode 100644
index 157c4e70..00000000
--- a/plugins/broadcast/root/rpc_test.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package broadcast
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestRPC_Broadcast(t *testing.T) {
- br, rpc, c := setup(`{}`)
- defer c.Stop()
-
- client := br.NewClient()
- defer client.Close()
-
- rcpClient, err := rpc.Client()
- assert.NoError(t, err)
-
- // must not be delivered
- ok := false
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello1"`)},
- &ok,
- ))
- assert.True(t, ok)
-
- assert.NoError(t, client.Subscribe("topic"))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello1"`)},
- &ok,
- ))
- assert.True(t, ok)
- assert.Equal(t, `"hello1"`, readStr(<-client.Channel()))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello2"`)},
- &ok,
- ))
- assert.True(t, ok)
- assert.Equal(t, `"hello2"`, readStr(<-client.Channel()))
-
- assert.NoError(t, client.Unsubscribe("topic"))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello3"`)},
- &ok,
- ))
- assert.True(t, ok)
-
- assert.NoError(t, client.Subscribe("topic"))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.Publish",
- []*Message{newMessage("topic", `"hello4"`)},
- &ok,
- ))
- assert.True(t, ok)
- assert.Equal(t, `"hello4"`, readStr(<-client.Channel()))
-
- assert.NoError(t, rcpClient.Call(
- "broadcast.PublishAsync",
- []*Message{newMessage("topic", `"hello5"`)},
- &ok,
- ))
- assert.True(t, ok)
- assert.Equal(t, `"hello5"`, readStr(<-client.Channel()))
-}
diff --git a/plugins/broadcast/root/service.go b/plugins/broadcast/root/service.go
deleted file mode 100644
index 8b175b3e..00000000
--- a/plugins/broadcast/root/service.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package broadcast
-
-import (
- "errors"
- "sync"
-
- "github.com/spiral/roadrunner/service/rpc"
-)
-
-// ID defines public service name.
-const ID = "broadcast"
-
-// Service manages even broadcasting and websocket interface.
-type Service struct {
- // service and broker configuration
- cfg *Config
-
- // broker
- mu sync.Mutex
- broker Broker
-}
-
-// Init service.
-func (s *Service) Init(cfg *Config, rpc *rpc.Service) (ok bool, err error) {
- s.cfg = cfg
-
- if rpc != nil {
- if err := rpc.Register(ID, &rpcService{svc: s}); err != nil {
- return false, err
- }
- }
-
- s.mu.Lock()
- if s.cfg.Redis != nil {
- if s.broker, err = redisBroker(s.cfg.Redis); err != nil {
- return false, err
- }
- } else {
- s.broker = memoryBroker()
- }
- s.mu.Unlock()
-
- return true, nil
-}
-
-// Serve broadcast broker.
-func (s *Service) Serve() (err error) {
- return s.broker.Serve()
-}
-
-// Stop closes broadcast broker.
-func (s *Service) Stop() {
- broker := s.Broker()
- if broker != nil {
- broker.Stop()
- }
-}
-
-// Broker returns associated broker.
-func (s *Service) Broker() Broker {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- return s.broker
-}
-
-// NewClient returns single connected client with ability to consume or produce into associated topic(svc).
-func (s *Service) NewClient() *Client {
- return &Client{
- upstream: make(chan *Message),
- broker: s.Broker(),
- topics: make([]string, 0),
- patterns: make([]string, 0),
- }
-}
-
-// Publish one or multiple Channel.
-func (s *Service) Publish(msg ...*Message) error {
- broker := s.Broker()
- if broker == nil {
- return errors.New("no stopped broker")
- }
-
- return s.Broker().Publish(msg...)
-}
diff --git a/plugins/broadcast/root/service_test.go b/plugins/broadcast/root/service_test.go
deleted file mode 100644
index 10b924cc..00000000
--- a/plugins/broadcast/root/service_test.go
+++ /dev/null
@@ -1,65 +0,0 @@
-package broadcast
-
-import (
- "fmt"
- "strings"
- "testing"
- "time"
-
- "github.com/sirupsen/logrus"
- "github.com/sirupsen/logrus/hooks/test"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/rpc"
- "github.com/stretchr/testify/assert"
-)
-
-var rpcPort = 6010
-
-func setup(cfg string) (*Service, *rpc.Service, service.Container) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{
- broadcast: cfg,
- rpc: fmt.Sprintf(`{"listen":"tcp://:%v"}`, rpcPort),
- })
-
- rpcPort++
-
- if err != nil {
- panic(err)
- }
-
- go func() {
- err = c.Serve()
- if err != nil {
- panic(err)
- }
- }()
- time.Sleep(time.Millisecond * 100)
-
- b, _ := c.Get(ID)
- br := b.(*Service)
-
- r, _ := c.Get(rpc.ID)
- rp := r.(*rpc.Service)
-
- return br, rp, c
-}
-
-func readStr(m *Message) string {
- return strings.TrimRight(string(m.Payload), "\n")
-}
-
-func newMessage(t, m string) *Message {
- return &Message{Topic: t, Payload: []byte(m)}
-}
-
-func TestService_Publish(t *testing.T) {
- svc := &Service{}
- assert.Error(t, svc.Publish(nil))
-}
diff --git a/plugins/broadcast/root/tests/.rr.yaml b/plugins/broadcast/root/tests/.rr.yaml
deleted file mode 100644
index c35a12fc..00000000
--- a/plugins/broadcast/root/tests/.rr.yaml
+++ /dev/null
@@ -1,2 +0,0 @@
-broadcast:
- redis.addr: "localhost:6379" \ No newline at end of file
diff --git a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php b/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php
deleted file mode 100644
index d6014bf0..00000000
--- a/plugins/broadcast/root/tests/Broadcast/BroadcastTest.php
+++ /dev/null
@@ -1,56 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-
-declare(strict_types=1);
-
-namespace Spiral\Broadcast\Tests;
-
-use PHPUnit\Framework\TestCase;
-use Spiral\Broadcast\Broadcast;
-use Spiral\Broadcast\Exception\BroadcastException;
-use Spiral\Broadcast\Message;
-use Spiral\Goridge\RPC;
-use Spiral\Goridge\SocketRelay;
-
-class BroadcastTest extends TestCase
-{
- public function testBroadcast(): void
- {
- $rpc = new RPC(new SocketRelay('localhost', 6001));
- $br = new Broadcast($rpc);
-
- $br->publish(
- new Message('tests/topic', 'hello'),
- new Message('tests/123', ['key' => 'value'])
- );
-
- while (filesize(__DIR__ . '/../log.txt') < 40) {
- clearstatcache(true, __DIR__ . '/../log.txt');
- usleep(1000);
- }
-
- clearstatcache(true, __DIR__ . '/../log.txt');
- $content = file_get_contents(__DIR__ . '/../log.txt');
-
- $this->assertSame('tests/topic: "hello"
-tests/123: {"key":"value"}
-', $content);
- }
-
- public function testBroadcastException(): void
- {
- $rpc = new RPC(new SocketRelay('localhost', 6002));
- $br = new Broadcast($rpc);
-
- $this->expectException(BroadcastException::class);
- $br->publish(
- new Message('topic', 'hello')
- );
- }
-}
diff --git a/plugins/broadcast/root/tests/Broadcast/MessageTest.php b/plugins/broadcast/root/tests/Broadcast/MessageTest.php
deleted file mode 100644
index dd9e1cc3..00000000
--- a/plugins/broadcast/root/tests/Broadcast/MessageTest.php
+++ /dev/null
@@ -1,24 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-
-declare(strict_types=1);
-
-namespace Spiral\Broadcast\Tests;
-
-use PHPUnit\Framework\TestCase;
-use Spiral\Broadcast\Message;
-
-class MessageTest extends TestCase
-{
- public function testSerialize(): void
- {
- $m = new Message('topic', ['hello' => 'world']);
- $this->assertSame('{"topic":"topic","payload":{"hello":"world"}}', json_encode($m));
- }
-}
diff --git a/plugins/broadcast/root/tests/bootstrap.php b/plugins/broadcast/root/tests/bootstrap.php
deleted file mode 100644
index d0dfb88b..00000000
--- a/plugins/broadcast/root/tests/bootstrap.php
+++ /dev/null
@@ -1,15 +0,0 @@
-<?php
-
-/**
- * Spiral Framework, SpiralScout LLC.
- *
- * @author Anton Titov (Wolfy-J)
- */
-
-declare(strict_types=1);
-
-error_reporting(E_ALL | E_STRICT);
-ini_set('display_errors', 'stderr');
-
-//Composer
-require dirname(__DIR__) . '/vendor_php/autoload.php';
diff --git a/plugins/broadcast/root/tests/docker-compose.yml b/plugins/broadcast/root/tests/docker-compose.yml
deleted file mode 100644
index 123aa9b9..00000000
--- a/plugins/broadcast/root/tests/docker-compose.yml
+++ /dev/null
@@ -1,9 +0,0 @@
-version: '3'
-
-services:
- redis:
- image: 'bitnami/redis:latest'
- environment:
- - ALLOW_EMPTY_PASSWORD=yes
- ports:
- - "6379:6379" \ No newline at end of file
diff --git a/plugins/broadcast/root/tests/go-client.go b/plugins/broadcast/root/tests/go-client.go
deleted file mode 100644
index 21442a01..00000000
--- a/plugins/broadcast/root/tests/go-client.go
+++ /dev/null
@@ -1,78 +0,0 @@
-package main
-
-import (
- "fmt"
- "os"
-
- "github.com/spiral/broadcast/v2"
- rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/service/rpc"
- "golang.org/x/sync/errgroup"
-)
-
-type logService struct {
- broadcast *broadcast.Service
- stop chan interface{}
-}
-
-func (l *logService) Init(service *broadcast.Service) (bool, error) {
- l.broadcast = service
-
- return true, nil
-}
-
-func (l *logService) Serve() error {
- l.stop = make(chan interface{})
-
- client := l.broadcast.NewClient()
- if err := client.SubscribePattern("tests/*"); err != nil {
- return err
- }
-
- logFile, _ := os.Create("log.txt")
-
- g := &errgroup.Group{}
- g.Go(func() error {
- for msg := range client.Channel() {
- _, err := logFile.Write([]byte(fmt.Sprintf(
- "%s: %s\n",
- msg.Topic,
- string(msg.Payload),
- )))
- if err != nil {
- return err
- }
-
- err = logFile.Sync()
- if err != nil {
- return err
- }
- }
- return nil
- })
-
- <-l.stop
- err := logFile.Close()
- if err != nil {
- return err
- }
-
- err = client.Close()
- if err != nil {
- return err
- }
-
- return g.Wait()
-}
-
-func (l *logService) Stop() {
- close(l.stop)
-}
-
-func main() {
- rr.Container.Register(rpc.ID, &rpc.Service{})
- rr.Container.Register(broadcast.ID, &broadcast.Service{})
- rr.Container.Register("log", &logService{})
-
- rr.Execute()
-}