summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-18 00:27:08 +0300
committerValery Piashchynski <[email protected]>2021-08-18 00:27:08 +0300
commit300560b44451bd9d5241ccdbaea3576760968ef2 (patch)
tree88d7d862707ae135f8c345e59111f2b2b9dff60f /plugins
parent65a70f5d15fb0a1b1f787ff7f06b3a299dab0f96 (diff)
Update broadcast tests, improve stop mechanism.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/memory/pubsub.go34
-rw-r--r--plugins/redis/pubsub/channel.go4
-rw-r--r--plugins/redis/pubsub/pubsub.go10
-rw-r--r--plugins/websockets/plugin.go26
4 files changed, 48 insertions, 26 deletions
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index c79f3eb0..fd30eb54 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -1,8 +1,10 @@
package memory
import (
+ "context"
"sync"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -65,21 +67,25 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- msg := <-p.pushCh
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("pubsub_memory")
+ select {
+ case msg := <-p.pushCh:
+ if msg == nil {
+ return nil, nil
+ }
- // push only messages, which topics are subscibed
- // TODO better???
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(msg.Topic); ok {
- return msg, nil
+ p.RLock()
+ defer p.RUnlock()
+ // push only messages, which topics are subscibed
+ // TODO better???
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
+ }
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
}
return nil, nil
diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go
index eef5a7b9..a1655ab2 100644
--- a/plugins/redis/pubsub/channel.go
+++ b/plugins/redis/pubsub/channel.go
@@ -92,6 +92,6 @@ func (r *redisChannel) stop() error {
return nil
}
-func (r *redisChannel) message() *pubsub.Message {
- return <-r.out
+func (r *redisChannel) message() chan *pubsub.Message {
+ return r.out
}
diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go
index 95a9f6dd..c9ad3d58 100644
--- a/plugins/redis/pubsub/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- return p.channel.message(), nil
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("redis_driver_next")
+ select {
+ case msg := <-p.channel.message():
+ return msg, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index a7db0f83..395b056f 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -58,6 +58,10 @@ type Plugin struct {
// server which produces commands to the pool
server server.Server
+ // stop receiving messages
+ cancel context.CancelFunc
+ ctx context.Context
+
// function used to validate access to the requested resource
accessValidator validator.AccessValidatorFn
}
@@ -90,6 +94,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.log = log
p.broadcaster = b
+
+ ctx, cancel := context.WithCancel(context.Background())
+ p.ctx = ctx
+ p.cancel = cancel
return nil
}
@@ -130,17 +138,17 @@ func (p *Plugin) Serve() chan error {
// we need here only Reader part of the interface
go func(ps pubsub.Reader) {
for {
- select {
- case <-p.serveExit:
- return
- default:
- data, err := ps.Next()
- if err != nil {
- errCh <- errors.E(op, err)
+ data, err := ps.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
- p.workersPool.Queue(data)
+
+ errCh <- errors.E(op, err)
+ return
}
+
+ p.workersPool.Queue(data)
}
}(p.subReader)
@@ -150,6 +158,8 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
+ // cancel context
+ p.cancel()
p.Lock()
if p.phpPool == nil {
p.Unlock()