summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-12 23:53:24 +0300
committerWolfy-J <[email protected]>2018-06-12 23:53:24 +0300
commit9a33c61229cae83ce00cd86ad619fa1825b95421 (patch)
tree88526b79de593821b937e1023e923a3611cdff5f
parentcb23c4958cee817a0aaf0a81e1f4ca315e492d84 (diff)
better multithread control
-rw-r--r--server.go24
-rw-r--r--static_pool.go18
2 files changed, 31 insertions, 11 deletions
diff --git a/server.go b/server.go
index 7a115a81..b46e65d5 100644
--- a/server.go
+++ b/server.go
@@ -28,9 +28,6 @@ type Server struct {
// configures server, pool, cmd creation and factory.
cfg *ServerConfig
- // observes pool events (can be attached to multiple pools at the same time)
- lsn func(event int, ctx interface{})
-
// protects pool while the re-configuration
mu sync.Mutex
@@ -42,6 +39,10 @@ type Server struct {
// currently active pool instance
pool Pool
+
+ // observes pool events (can be attached to multiple pools at the same time)
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
}
// NewServer creates new router. Make sure to call configure before the usage.
@@ -51,6 +52,9 @@ func NewServer(cfg *ServerConfig) *Server {
// AddListener attaches server event watcher.
func (srv *Server) Listen(l func(event int, ctx interface{})) {
+ srv.mul.Lock()
+ defer srv.mul.Unlock()
+
srv.lsn = l
}
@@ -146,7 +150,11 @@ func (srv *Server) Reconfigure(cfg *ServerConfig) error {
// Reset resets the state of underlying pool and rebuilds all of it's workers.
func (srv *Server) Reset() error {
- return srv.Reconfigure(srv.cfg)
+ srv.mu.Lock()
+ cfg := srv.cfg
+ srv.mu.Unlock()
+
+ return srv.Reconfigure(cfg)
}
// Workers returns worker list associated with the server pool.
@@ -190,7 +198,11 @@ func (srv *Server) poolListener(event int, ctx interface{}) {
// throw invokes event handler if any.
func (srv *Server) throw(event int, ctx interface{}) {
- if srv.lsn != nil {
- srv.lsn(event, ctx)
+ srv.mul.Lock()
+ lsn := srv.lsn
+ srv.mul.Unlock()
+
+ if lsn != nil {
+ lsn(event, ctx)
}
}
diff --git a/static_pool.go b/static_pool.go
index 24665c01..19fc1d13 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -22,9 +22,6 @@ type StaticPool struct {
// worker command creator
cmd func() *exec.Cmd
- // lsn is optional callback to handle worker create/destruct/error events.
- lsn func(event int, ctx interface{})
-
// creates and connects to workers
factory Factory
@@ -42,6 +39,10 @@ type StaticPool struct {
// pool is being destroying
inDestroy int32
+
+ // lsn is optional callback to handle worker create/destruct/error events.
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -75,6 +76,9 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
// AddListener attaches pool event watcher.
func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
+ p.mul.Lock()
+ defer p.mul.Unlock()
+
p.lsn = l
}
@@ -268,7 +272,11 @@ func (p *StaticPool) destroying() bool {
// throw invokes event handler if any.
func (p *StaticPool) throw(event int, ctx interface{}) {
- if p.lsn != nil {
- p.lsn(event, ctx)
+ p.mul.Lock()
+ lsn := p.lsn
+ p.mul.Unlock()
+
+ if lsn != nil {
+ lsn(event, ctx)
}
}