diff options
author | Wolfy-J <[email protected]> | 2018-06-12 23:53:24 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-12 23:53:24 +0300 |
commit | 9a33c61229cae83ce00cd86ad619fa1825b95421 (patch) | |
tree | 88526b79de593821b937e1023e923a3611cdff5f | |
parent | cb23c4958cee817a0aaf0a81e1f4ca315e492d84 (diff) |
better multithread control
-rw-r--r-- | server.go | 24 | ||||
-rw-r--r-- | static_pool.go | 18 |
2 files changed, 31 insertions, 11 deletions
@@ -28,9 +28,6 @@ type Server struct { // configures server, pool, cmd creation and factory. cfg *ServerConfig - // observes pool events (can be attached to multiple pools at the same time) - lsn func(event int, ctx interface{}) - // protects pool while the re-configuration mu sync.Mutex @@ -42,6 +39,10 @@ type Server struct { // currently active pool instance pool Pool + + // observes pool events (can be attached to multiple pools at the same time) + mul sync.Mutex + lsn func(event int, ctx interface{}) } // NewServer creates new router. Make sure to call configure before the usage. @@ -51,6 +52,9 @@ func NewServer(cfg *ServerConfig) *Server { // AddListener attaches server event watcher. func (srv *Server) Listen(l func(event int, ctx interface{})) { + srv.mul.Lock() + defer srv.mul.Unlock() + srv.lsn = l } @@ -146,7 +150,11 @@ func (srv *Server) Reconfigure(cfg *ServerConfig) error { // Reset resets the state of underlying pool and rebuilds all of it's workers. func (srv *Server) Reset() error { - return srv.Reconfigure(srv.cfg) + srv.mu.Lock() + cfg := srv.cfg + srv.mu.Unlock() + + return srv.Reconfigure(cfg) } // Workers returns worker list associated with the server pool. @@ -190,7 +198,11 @@ func (srv *Server) poolListener(event int, ctx interface{}) { // throw invokes event handler if any. func (srv *Server) throw(event int, ctx interface{}) { - if srv.lsn != nil { - srv.lsn(event, ctx) + srv.mul.Lock() + lsn := srv.lsn + srv.mul.Unlock() + + if lsn != nil { + lsn(event, ctx) } } diff --git a/static_pool.go b/static_pool.go index 24665c01..19fc1d13 100644 --- a/static_pool.go +++ b/static_pool.go @@ -22,9 +22,6 @@ type StaticPool struct { // worker command creator cmd func() *exec.Cmd - // lsn is optional callback to handle worker create/destruct/error events. - lsn func(event int, ctx interface{}) - // creates and connects to workers factory Factory @@ -42,6 +39,10 @@ type StaticPool struct { // pool is being destroying inDestroy int32 + + // lsn is optional callback to handle worker create/destruct/error events. + mul sync.Mutex + lsn func(event int, ctx interface{}) } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -75,6 +76,9 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er // AddListener attaches pool event watcher. func (p *StaticPool) Listen(l func(event int, ctx interface{})) { + p.mul.Lock() + defer p.mul.Unlock() + p.lsn = l } @@ -268,7 +272,11 @@ func (p *StaticPool) destroying() bool { // throw invokes event handler if any. func (p *StaticPool) throw(event int, ctx interface{}) { - if p.lsn != nil { - p.lsn(event, ctx) + p.mul.Lock() + lsn := p.lsn + p.mul.Unlock() + + if lsn != nil { + lsn(event, ctx) } } |