diff options
-rw-r--r-- | plugins/broadcast/plugin.go | 19 | ||||
-rw-r--r-- | plugins/redis/channel.go (renamed from plugins/redis/fanin.go) | 40 | ||||
-rw-r--r-- | plugins/redis/pubsub.go | 12 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 2 | ||||
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 194 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-global.yaml | 46 |
6 files changed, 244 insertions, 69 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 04a4fb80..6ddef806 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -116,7 +116,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { }() } -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit const op = errors.Op("broadcast_plugin_get_driver") // choose a driver @@ -164,17 +164,28 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { return nil, errors.E(op, err) } - // save the pubsub under a config key - // + // if section already exists, return new connection + if _, ok := p.publishers[configKey]; ok { + return ps, nil + } + + // if not - initialize a connection p.publishers[configKey] = ps return ps, nil + + // then try global if local does not exist case p.cfgPlugin.Has(redis): ps, err := p.constructors[redis].PSConstruct(configKey) if err != nil { return nil, errors.E(op, err) } - // save the pubsub + // if section already exists, return new connection + if _, ok := p.publishers[configKey]; ok { + return ps, nil + } + + // if not - initialize a connection p.publishers[configKey] = ps return ps, nil } diff --git a/plugins/redis/fanin.go b/plugins/redis/channel.go index 40a99d20..5817853c 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/channel.go @@ -11,7 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type FanIn struct { +type redisChannel struct { sync.Mutex // redis client @@ -26,9 +26,9 @@ type FanIn struct { exit chan struct{} } -func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { +func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { out := make(chan *pubsub.Message, 100) - fi := &FanIn{ + fi := &redisChannel{ out: out, client: redisClient, pubsub: redisClient.Subscribe(context.Background()), @@ -42,9 +42,9 @@ func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { return fi } -func (fi *FanIn) sub(topics ...string) error { - const op = errors.Op("fanin_addchannel") - err := fi.pubsub.Subscribe(context.Background(), topics...) +func (r *redisChannel) sub(topics ...string) error { + const op = errors.Op("redis_sub") + err := r.pubsub.Subscribe(context.Background(), topics...) if err != nil { return errors.E(op, err) } @@ -52,46 +52,46 @@ func (fi *FanIn) sub(topics ...string) error { } // read reads messages from the pubsub subscription -func (fi *FanIn) read() { +func (r *redisChannel) read() { for { select { // here we receive message from us (which we sent before in Publish) - // it should be compatible with the websockets.Msg interface + // it should be compatible with the pubsub.Message structure // payload should be in the redis.message.payload field - case msg, ok := <-fi.pubsub.Channel(): + case msg, ok := <-r.pubsub.Channel(): // channel closed if !ok { return } - fi.out <- &pubsub.Message{ + r.out <- &pubsub.Message{ Topic: msg.Channel, Payload: utils.AsBytes(msg.Payload), } - case <-fi.exit: + case <-r.exit: return } } } -func (fi *FanIn) unsub(topic string) error { - const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topic) +func (r *redisChannel) unsub(topic string) error { + const op = errors.Op("redis_unsub") + err := r.pubsub.Unsubscribe(context.Background(), topic) if err != nil { return errors.E(op, err) } return nil } -func (fi *FanIn) stop() error { - fi.exit <- struct{}{} - close(fi.out) - close(fi.exit) +func (r *redisChannel) stop() error { + r.exit <- struct{}{} + close(r.out) + close(r.exit) return nil } -func (fi *FanIn) consume() <-chan *pubsub.Message { - return fi.out +func (r *redisChannel) message() *pubsub.Message { + return <-r.out } diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 7253511d..4e41acb5 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -16,7 +16,7 @@ type PubSubDriver struct { cfg *Config `mapstructure:"redis"` log logger.Logger - fanin *FanIn + channel *redisChannel universalClient redis.UniversalClient stopCh chan struct{} } @@ -65,7 +65,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, return nil, statusCmd.Err() } - ps.fanin = newFanIn(ps.universalClient, log) + ps.channel = newRedisChannel(ps.universalClient, log) ps.stop() @@ -75,7 +75,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, func (p *PubSubDriver) stop() { go func() { for range p.stopCh { - _ = p.fanin.stop() + _ = p.channel.stop() return } }() @@ -122,7 +122,7 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { } // and subscribe after - return p.fanin.sub(topics...) + return p.channel.sub(topics...) } func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { @@ -148,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error } // else - unsubscribe - err = p.fanin.unsub(topics[i]) + err = p.channel.unsub(topics[i]) if err != nil { return err } @@ -173,5 +173,5 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { // Next return next message func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return <-p.fanin.consume(), nil + return p.channel.message(), nil } diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 00e053ec..752ba3ce 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -93,7 +93,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit wp.subscriber.Connections(msg.Topic, res) if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.Topic) + wp.log.Info("no connections associated with provided topic", "topic", msg.Topic) wp.put(res) continue } diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index d6510058..2cd4b451 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -273,10 +273,10 @@ func TestBroadcastSameSubscriber(t *testing.T) { time.Sleep(time.Second * 2) - t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3) - t.Run("PublishHelloFoo2", BroadcastPublishFoo2) - t.Run("PublishHelloFoo3", BroadcastPublishFoo3) - t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3) + t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002")) + t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002")) + t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002")) + t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002")) time.Sleep(time.Second * 4) stopCh <- struct{}{} @@ -284,61 +284,179 @@ func TestBroadcastSameSubscriber(t *testing.T) { wg.Wait() } -func BroadcastPublishFooFoo2Foo3(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6002") - if err != nil { - t.Fatal(err) +func TestBroadcastSameSubscriberGlobal(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-global.yaml", + Prefix: "rr", } - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1) + mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin4: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin5: {foo hello}`).Times(3) + mockLogger.EXPECT().Info(`plugin6: {foo hello}`).Times(3) + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + + // test - redis + // test2 - redis (port 6378) + // test3 - memory + // test4 - memory + &plugins.Plugin1{}, // foo, foo2, foo3 test + &plugins.Plugin2{}, // foo, test + &plugins.Plugin3{}, // foo, test2 + &plugins.Plugin4{}, // foo, test3 + &plugins.Plugin5{}, // foo, test4 + &plugins.Plugin6{}, // foo, test3 + ) + + assert.NoError(t, err) - ret := &websocketsv1.Response{} - err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret) + err = cont.Init() if err != nil { t.Fatal(err) } -} -func BroadcastPublishFoo2(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6002") + ch, err := cont.Serve() if err != nil { t.Fatal(err) } - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - ret := &websocketsv1.Response{} - err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo"), ret) - if err != nil { - t.Fatal(err) - } + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 2) + + t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003")) + t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003")) + t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003")) + t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003")) + + time.Sleep(time.Second * 4) + stopCh <- struct{}{} + + wg.Wait() } -func BroadcastPublishFoo3(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6002") - if err != nil { - t.Fatal(err) + +func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) + if err != nil { + t.Fatal(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret) + if err != nil { + t.Fatal(err) + } } +} - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) +func BroadcastPublishFoo2(port string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) + if err != nil { + t.Fatal(err) + } - ret := &websocketsv1.Response{} - err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo3"), ret) - if err != nil { - t.Fatal(err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo"), ret) + if err != nil { + t.Fatal(err) + } } } -func BroadcastPublishAsyncFooFoo2Foo3(t *testing.T) { - conn, err := net.Dial("tcp", "127.0.0.1:6002") - if err != nil { - t.Fatal(err) + +func BroadcastPublishFoo3(port string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) + if err != nil { + t.Fatal(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("broadcast.Publish", makeMessage([]byte("hello"), "foo3"), ret) + if err != nil { + t.Fatal(err) + } } +} +func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:"+port) + if err != nil { + t.Fatal(err) + } - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - ret := &websocketsv1.Response{} - err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret) - if err != nil { - t.Fatal(err) + ret := &websocketsv1.Response{} + err = client.Call("broadcast.PublishAsync", makeMessage([]byte("hello"), "foo", "foo2", "foo3"), ret) + if err != nil { + t.Fatal(err) + } } } diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml new file mode 100644 index 00000000..2ca97055 --- /dev/null +++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml @@ -0,0 +1,46 @@ +rpc: + listen: tcp://127.0.0.1:6003 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:21543 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +broadcast: + test: + driver: redis + test2: + driver: redis + addrs: + - "localhost:6378" + test3: + driver: memory + test4: + driver: memory + + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error |