diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 00:27:08 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-18 00:27:08 +0300 |
commit | 300560b44451bd9d5241ccdbaea3576760968ef2 (patch) | |
tree | 88d7d862707ae135f8c345e59111f2b2b9dff60f /plugins/websockets | |
parent | 65a70f5d15fb0a1b1f787ff7f06b3a299dab0f96 (diff) |
Update broadcast tests, improve stop mechanism.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/plugin.go | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index a7db0f83..395b056f 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -58,6 +58,10 @@ type Plugin struct { // server which produces commands to the pool server server.Server + // stop receiving messages + cancel context.CancelFunc + ctx context.Context + // function used to validate access to the requested resource accessValidator validator.AccessValidatorFn } @@ -90,6 +94,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.log = log p.broadcaster = b + + ctx, cancel := context.WithCancel(context.Background()) + p.ctx = ctx + p.cancel = cancel return nil } @@ -130,17 +138,17 @@ func (p *Plugin) Serve() chan error { // we need here only Reader part of the interface go func(ps pubsub.Reader) { for { - select { - case <-p.serveExit: - return - default: - data, err := ps.Next() - if err != nil { - errCh <- errors.E(op, err) + data, err := ps.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } - p.workersPool.Queue(data) + + errCh <- errors.E(op, err) + return } + + p.workersPool.Queue(data) } }(p.subReader) @@ -150,6 +158,8 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() + // cancel context + p.cancel() p.Lock() if p.phpPool == nil { p.Unlock() |