summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-29 11:27:49 +0300
committerValery Piashchynski <[email protected]>2021-05-29 11:27:49 +0300
commit09b982813f8825f776abf20fb16c6085439ca4ba (patch)
tree1c4593bdc42503616b06f32bb6ee676cca38515a /plugins/websockets/plugin.go
parentfcda08498e8f914bbd0798da898818cd5d0e4348 (diff)
- Update channel plugin interfaces
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go12
1 files changed, 9 insertions, 3 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 2a060716..b3495e77 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -3,6 +3,7 @@ package websockets
import (
"net/http"
"sync"
+ "sync/atomic"
"time"
"github.com/fasthttp/websocket"
@@ -39,6 +40,7 @@ type Plugin struct {
// GO workers pool
workersPool *pool.WorkersPool
+ stopped uint64
hub channel.Hub
}
@@ -59,6 +61,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, channel channel.
p.storage = storage.NewStorage()
p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
p.hub = channel
+ p.stopped = 0
return nil
}
@@ -84,6 +87,7 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
+ atomic.AddUint64(&p.stopped, 1)
p.workersPool.Stop()
return nil
}
@@ -118,7 +122,11 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
next.ServeHTTP(w, r)
return
}
- p.mu.Lock()
+
+ if atomic.CompareAndSwapUint64(&p.stopped, 1, 1) {
+ // plugin stopped
+ return
+ }
r = attributes.Init(r)
@@ -133,8 +141,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
}
}
- p.mu.Unlock()
-
// connection upgrader
upgraded := websocket.Upgrader{
HandshakeTimeout: time.Second * 60,