diff options
author | Wolfy-J <[email protected]> | 2019-05-03 12:28:11 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-05-03 12:28:11 +0300 |
commit | 9b886ceab6a63e8264f2b2c9d35d76628085dbd6 (patch) | |
tree | 5671b75b1e8f04900b54462f6e647a52e0ff6d8b /server.go | |
parent | 62250fde949d2511b1e030c570bafe2ef3ed2c2d (diff) |
added pool watcher capability
Diffstat (limited to 'server.go')
-rw-r--r-- | server.go | 49 |
1 files changed, 44 insertions, 5 deletions
@@ -37,9 +37,13 @@ type Server struct { // creates and connects to workers factory Factory + // associated pool watcher + watcher Watcher + // currently active pool instance - mup sync.Mutex - pool Pool + mup sync.Mutex + pool Pool + pWatcher Watcher // observes pool events (can be attached to multiple pools at the same time) mul sync.Mutex @@ -59,6 +63,21 @@ func (s *Server) Listen(l func(event int, ctx interface{})) { s.lsn = l } +// Listen attaches server event watcher. +func (s *Server) Watch(w Watcher) { + s.mu.Lock() + defer s.mu.Unlock() + + s.watcher = w + + s.mul.Lock() + if s.pWatcher != nil && s.pool != nil { + s.pWatcher.Detach() + s.pWatcher = s.watcher.Attach(s.pool) + } + s.mul.Unlock() +} + // Start underlying worker pool, configure factory and command provider. func (s *Server) Start() (err error) { s.mu.Lock() @@ -72,6 +91,10 @@ func (s *Server) Start() (err error) { return err } + if s.watcher != nil { + s.pWatcher = s.watcher.Attach(s.pool) + } + s.pool.Listen(s.poolListener) s.started = true s.throw(EventServerStart, s) @@ -79,7 +102,7 @@ func (s *Server) Start() (err error) { return nil } -// Stop underlying worker pool and close the factory. +// Detach underlying worker pool and close the factory. func (s *Server) Stop() { s.mu.Lock() defer s.mu.Unlock() @@ -89,6 +112,12 @@ func (s *Server) Stop() { } s.throw(EventPoolDestruct, s.pool) + + if s.pWatcher != nil { + s.pWatcher.Detach() + s.pWatcher = nil + } + s.pool.Destroy() s.factory.Close() @@ -128,6 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() previous := s.pool + pWatcher := s.pWatcher s.mu.Unlock() pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) @@ -139,15 +169,24 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() s.cfg.Pool, s.pool = cfg.Pool, pool + + if s.watcher != nil { + s.pWatcher = s.watcher.Attach(pool) + } + s.mu.Unlock() s.throw(EventPoolConstruct, pool) if previous != nil { - go func(previous Pool) { + go func(previous Pool, pWatcher Watcher) { s.throw(EventPoolDestruct, previous) + if pWatcher != nil { + pWatcher.Detach() + } + previous.Destroy() - }(previous) + }(previous, pWatcher) } return nil |