diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 00:27:08 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-18 00:27:08 +0300 |
commit | 300560b44451bd9d5241ccdbaea3576760968ef2 (patch) | |
tree | 88d7d862707ae135f8c345e59111f2b2b9dff60f /plugins | |
parent | 65a70f5d15fb0a1b1f787ff7f06b3a299dab0f96 (diff) |
Update broadcast tests, improve stop mechanism.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/memory/pubsub.go | 34 | ||||
-rw-r--r-- | plugins/redis/pubsub/channel.go | 4 | ||||
-rw-r--r-- | plugins/redis/pubsub/pubsub.go | 10 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 26 |
4 files changed, 48 insertions, 26 deletions
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index c79f3eb0..fd30eb54 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -1,8 +1,10 @@ package memory import ( + "context" "sync" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/bst" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -65,21 +67,25 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } } -func (p *PubSubDriver) Next() (*pubsub.Message, error) { - msg := <-p.pushCh - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("pubsub_memory") + select { + case msg := <-p.pushCh: + if msg == nil { + return nil, nil + } - // push only messages, which topics are subscibed - // TODO better??? - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(msg.Topic); ok { - return msg, nil + p.RLock() + defer p.RUnlock() + // push only messages, which topics are subscibed + // TODO better??? + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil + } + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) } return nil, nil diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go index eef5a7b9..a1655ab2 100644 --- a/plugins/redis/pubsub/channel.go +++ b/plugins/redis/pubsub/channel.go @@ -92,6 +92,6 @@ func (r *redisChannel) stop() error { return nil } -func (r *redisChannel) message() *pubsub.Message { - return <-r.out +func (r *redisChannel) message() chan *pubsub.Message { + return r.out } diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go index 95a9f6dd..c9ad3d58 100644 --- a/plugins/redis/pubsub/pubsub.go +++ b/plugins/redis/pubsub/pubsub.go @@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return p.channel.message(), nil +func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { + const op = errors.Op("redis_driver_next") + select { + case msg := <-p.channel.message(): + return msg, nil + case <-ctx.Done(): + return nil, errors.E(op, errors.TimeOut, ctx.Err()) + } } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index a7db0f83..395b056f 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -58,6 +58,10 @@ type Plugin struct { // server which produces commands to the pool server server.Server + // stop receiving messages + cancel context.CancelFunc + ctx context.Context + // function used to validate access to the requested resource accessValidator validator.AccessValidatorFn } @@ -90,6 +94,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.log = log p.broadcaster = b + + ctx, cancel := context.WithCancel(context.Background()) + p.ctx = ctx + p.cancel = cancel return nil } @@ -130,17 +138,17 @@ func (p *Plugin) Serve() chan error { // we need here only Reader part of the interface go func(ps pubsub.Reader) { for { - select { - case <-p.serveExit: - return - default: - data, err := ps.Next() - if err != nil { - errCh <- errors.E(op, err) + data, err := ps.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } - p.workersPool.Queue(data) + + errCh <- errors.E(op, err) + return } + + p.workersPool.Queue(data) } }(p.subReader) @@ -150,6 +158,8 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() + // cancel context + p.cancel() p.Lock() if p.phpPool == nil { p.Unlock() |