summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go26
1 files changed, 18 insertions, 8 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index a7db0f83..395b056f 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -58,6 +58,10 @@ type Plugin struct {
// server which produces commands to the pool
server server.Server
+ // stop receiving messages
+ cancel context.CancelFunc
+ ctx context.Context
+
// function used to validate access to the requested resource
accessValidator validator.AccessValidatorFn
}
@@ -90,6 +94,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.log = log
p.broadcaster = b
+
+ ctx, cancel := context.WithCancel(context.Background())
+ p.ctx = ctx
+ p.cancel = cancel
return nil
}
@@ -130,17 +138,17 @@ func (p *Plugin) Serve() chan error {
// we need here only Reader part of the interface
go func(ps pubsub.Reader) {
for {
- select {
- case <-p.serveExit:
- return
- default:
- data, err := ps.Next()
- if err != nil {
- errCh <- errors.E(op, err)
+ data, err := ps.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
- p.workersPool.Queue(data)
+
+ errCh <- errors.E(op, err)
+ return
}
+
+ p.workersPool.Queue(data)
}
}(p.subReader)
@@ -150,6 +158,8 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
+ // cancel context
+ p.cancel()
p.Lock()
if p.phpPool == nil {
p.Unlock()