From 9b886ceab6a63e8264f2b2c9d35d76628085dbd6 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Fri, 3 May 2019 12:28:11 +0300 Subject: added pool watcher capability --- server.go | 49 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 5 deletions(-) (limited to 'server.go') diff --git a/server.go b/server.go index 2672a792..397898f2 100644 --- a/server.go +++ b/server.go @@ -37,9 +37,13 @@ type Server struct { // creates and connects to workers factory Factory + // associated pool watcher + watcher Watcher + // currently active pool instance - mup sync.Mutex - pool Pool + mup sync.Mutex + pool Pool + pWatcher Watcher // observes pool events (can be attached to multiple pools at the same time) mul sync.Mutex @@ -59,6 +63,21 @@ func (s *Server) Listen(l func(event int, ctx interface{})) { s.lsn = l } +// Listen attaches server event watcher. +func (s *Server) Watch(w Watcher) { + s.mu.Lock() + defer s.mu.Unlock() + + s.watcher = w + + s.mul.Lock() + if s.pWatcher != nil && s.pool != nil { + s.pWatcher.Detach() + s.pWatcher = s.watcher.Attach(s.pool) + } + s.mul.Unlock() +} + // Start underlying worker pool, configure factory and command provider. func (s *Server) Start() (err error) { s.mu.Lock() @@ -72,6 +91,10 @@ func (s *Server) Start() (err error) { return err } + if s.watcher != nil { + s.pWatcher = s.watcher.Attach(s.pool) + } + s.pool.Listen(s.poolListener) s.started = true s.throw(EventServerStart, s) @@ -79,7 +102,7 @@ func (s *Server) Start() (err error) { return nil } -// Stop underlying worker pool and close the factory. +// Detach underlying worker pool and close the factory. func (s *Server) Stop() { s.mu.Lock() defer s.mu.Unlock() @@ -89,6 +112,12 @@ func (s *Server) Stop() { } s.throw(EventPoolDestruct, s.pool) + + if s.pWatcher != nil { + s.pWatcher.Detach() + s.pWatcher = nil + } + s.pool.Destroy() s.factory.Close() @@ -128,6 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() previous := s.pool + pWatcher := s.pWatcher s.mu.Unlock() pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) @@ -139,15 +169,24 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() s.cfg.Pool, s.pool = cfg.Pool, pool + + if s.watcher != nil { + s.pWatcher = s.watcher.Attach(pool) + } + s.mu.Unlock() s.throw(EventPoolConstruct, pool) if previous != nil { - go func(previous Pool) { + go func(previous Pool, pWatcher Watcher) { s.throw(EventPoolDestruct, previous) + if pWatcher != nil { + pWatcher.Detach() + } + previous.Destroy() - }(previous) + }(previous, pWatcher) } return nil -- cgit v1.2.3