summaryrefslogtreecommitdiff
path: root/server.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-03 12:28:11 +0300
committerWolfy-J <[email protected]>2019-05-03 12:28:11 +0300
commit9b886ceab6a63e8264f2b2c9d35d76628085dbd6 (patch)
tree5671b75b1e8f04900b54462f6e647a52e0ff6d8b /server.go
parent62250fde949d2511b1e030c570bafe2ef3ed2c2d (diff)
added pool watcher capability
Diffstat (limited to 'server.go')
-rw-r--r--server.go49
1 files changed, 44 insertions, 5 deletions
diff --git a/server.go b/server.go
index 2672a792..397898f2 100644
--- a/server.go
+++ b/server.go
@@ -37,9 +37,13 @@ type Server struct {
// creates and connects to workers
factory Factory
+ // associated pool watcher
+ watcher Watcher
+
// currently active pool instance
- mup sync.Mutex
- pool Pool
+ mup sync.Mutex
+ pool Pool
+ pWatcher Watcher
// observes pool events (can be attached to multiple pools at the same time)
mul sync.Mutex
@@ -59,6 +63,21 @@ func (s *Server) Listen(l func(event int, ctx interface{})) {
s.lsn = l
}
+// Listen attaches server event watcher.
+func (s *Server) Watch(w Watcher) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.watcher = w
+
+ s.mul.Lock()
+ if s.pWatcher != nil && s.pool != nil {
+ s.pWatcher.Detach()
+ s.pWatcher = s.watcher.Attach(s.pool)
+ }
+ s.mul.Unlock()
+}
+
// Start underlying worker pool, configure factory and command provider.
func (s *Server) Start() (err error) {
s.mu.Lock()
@@ -72,6 +91,10 @@ func (s *Server) Start() (err error) {
return err
}
+ if s.watcher != nil {
+ s.pWatcher = s.watcher.Attach(s.pool)
+ }
+
s.pool.Listen(s.poolListener)
s.started = true
s.throw(EventServerStart, s)
@@ -79,7 +102,7 @@ func (s *Server) Start() (err error) {
return nil
}
-// Stop underlying worker pool and close the factory.
+// Detach underlying worker pool and close the factory.
func (s *Server) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
@@ -89,6 +112,12 @@ func (s *Server) Stop() {
}
s.throw(EventPoolDestruct, s.pool)
+
+ if s.pWatcher != nil {
+ s.pWatcher.Detach()
+ s.pWatcher = nil
+ }
+
s.pool.Destroy()
s.factory.Close()
@@ -128,6 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.mu.Lock()
previous := s.pool
+ pWatcher := s.pWatcher
s.mu.Unlock()
pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
@@ -139,15 +169,24 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.mu.Lock()
s.cfg.Pool, s.pool = cfg.Pool, pool
+
+ if s.watcher != nil {
+ s.pWatcher = s.watcher.Attach(pool)
+ }
+
s.mu.Unlock()
s.throw(EventPoolConstruct, pool)
if previous != nil {
- go func(previous Pool) {
+ go func(previous Pool, pWatcher Watcher) {
s.throw(EventPoolDestruct, previous)
+ if pWatcher != nil {
+ pWatcher.Detach()
+ }
+
previous.Destroy()
- }(previous)
+ }(previous, pWatcher)
}
return nil