summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server.go135
-rw-r--r--service/http/handler.go7
-rw-r--r--static_pool.go7
3 files changed, 73 insertions, 76 deletions
diff --git a/server.go b/server.go
index b46e65d5..45b4597c 100644
--- a/server.go
+++ b/server.go
@@ -51,55 +51,55 @@ func NewServer(cfg *ServerConfig) *Server {
}
// AddListener attaches server event watcher.
-func (srv *Server) Listen(l func(event int, ctx interface{})) {
- srv.mul.Lock()
- defer srv.mul.Unlock()
+func (s *Server) Listen(l func(event int, ctx interface{})) {
+ s.mul.Lock()
+ defer s.mul.Unlock()
- srv.lsn = l
+ s.lsn = l
}
// Start underlying worker pool, configure factory and command provider.
-func (srv *Server) Start() (err error) {
- srv.mu.Lock()
- defer srv.mu.Unlock()
+func (s *Server) Start() (err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
- if srv.factory, err = srv.cfg.makeFactory(); err != nil {
+ if s.factory, err = s.cfg.makeFactory(); err != nil {
return err
}
- if srv.pool, err = NewPool(srv.cfg.makeCommand(), srv.factory, *srv.cfg.Pool); err != nil {
+ if s.pool, err = NewPool(s.cfg.makeCommand(), s.factory, *s.cfg.Pool); err != nil {
return err
}
- srv.pool.Listen(srv.poolListener)
- srv.started = true
- srv.throw(EventServerStart, srv)
+ s.pool.Listen(s.poolListener)
+ s.started = true
+ s.throw(EventServerStart, s)
return nil
}
// Stop underlying worker pool and close the factory.
-func (srv *Server) Stop() {
- srv.mu.Lock()
- defer srv.mu.Unlock()
+func (s *Server) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
- if !srv.started {
+ if !s.started {
return
}
- srv.throw(EventPoolDestruct, srv.pool)
- srv.pool.Destroy()
- srv.factory.Close()
+ s.throw(EventPoolDestruct, s.pool)
+ s.pool.Destroy()
+ s.factory.Close()
- srv.factory = nil
- srv.pool = nil
- srv.started = false
- srv.throw(EventServerStop, srv)
+ s.factory = nil
+ s.pool = nil
+ s.started = false
+ s.throw(EventServerStop, s)
}
// Exec one task with given payload and context, returns result or error.
-func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
- pool := srv.Pool()
+func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
+ pool := s.Pool()
if pool == nil {
return nil, fmt.Errorf("no associared pool")
}
@@ -109,38 +109,38 @@ func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
// Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory
// and relay settings.
-func (srv *Server) Reconfigure(cfg *ServerConfig) error {
- srv.mu.Lock()
- if !srv.started {
- srv.cfg = cfg
- srv.mu.Unlock()
+func (s *Server) Reconfigure(cfg *ServerConfig) error {
+ s.mu.Lock()
+ if !s.started {
+ s.cfg = cfg
+ s.mu.Unlock()
return nil
}
- srv.mu.Unlock()
+ s.mu.Unlock()
- if srv.cfg.Differs(cfg) {
+ if s.cfg.Differs(cfg) {
return errors.New("unable to reconfigure server (cmd and pool changes are allowed)")
}
- srv.mu.Lock()
- previous := srv.pool
- srv.mu.Unlock()
+ s.mu.Lock()
+ previous := s.pool
+ s.mu.Unlock()
- pool, err := NewPool(cfg.makeCommand(), srv.factory, *cfg.Pool)
- srv.pool.Listen(srv.poolListener)
+ pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
+ s.pool.Listen(s.poolListener)
if err != nil {
return err
}
- srv.mu.Lock()
- srv.cfg.Pool, srv.pool = cfg.Pool, pool
- srv.mu.Unlock()
+ s.mu.Lock()
+ s.cfg.Pool, s.pool = cfg.Pool, pool
+ s.mu.Unlock()
- srv.throw(EventPoolConstruct, pool)
+ s.throw(EventPoolConstruct, pool)
if previous != nil {
go func(previous Pool) {
- srv.throw(EventPoolDestruct, previous)
+ s.throw(EventPoolDestruct, previous)
previous.Destroy()
}(previous)
}
@@ -149,17 +149,17 @@ func (srv *Server) Reconfigure(cfg *ServerConfig) error {
}
// Reset resets the state of underlying pool and rebuilds all of it's workers.
-func (srv *Server) Reset() error {
- srv.mu.Lock()
- cfg := srv.cfg
- srv.mu.Unlock()
+func (s *Server) Reset() error {
+ s.mu.Lock()
+ cfg := s.cfg
+ s.mu.Unlock()
- return srv.Reconfigure(cfg)
+ return s.Reconfigure(cfg)
}
// Workers returns worker list associated with the server pool.
-func (srv *Server) Workers() (workers []*Worker) {
- p := srv.Pool()
+func (s *Server) Workers() (workers []*Worker) {
+ p := s.Pool()
if p == nil {
return nil
}
@@ -168,41 +168,40 @@ func (srv *Server) Workers() (workers []*Worker) {
}
// Pool returns active pool or error.
-func (srv *Server) Pool() Pool {
- srv.mu.Lock()
- defer srv.mu.Unlock()
+func (s *Server) Pool() Pool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
- return srv.pool
+ return s.pool
}
// AddListener pool events.
-func (srv *Server) poolListener(event int, ctx interface{}) {
+func (s *Server) poolListener(event int, ctx interface{}) {
// bypassing to user specified lsn
- srv.throw(event, ctx)
+ s.throw(event, ctx)
if event == EventPoolError {
// pool failure, rebuilding
- if err := srv.Reset(); err != nil {
- srv.mu.Lock()
- defer srv.mu.Unlock()
+ if err := s.Reset(); err != nil {
+ s.mu.Lock()
+ defer s.mu.Unlock()
- srv.started = false
- srv.pool = nil
- srv.factory = nil
+ s.started = false
+ s.pool = nil
+ s.factory = nil
// everything is dead, this is recoverable but heavy state
- srv.throw(EventServerFailure, err)
+ s.throw(EventServerFailure, err)
}
}
}
// throw invokes event handler if any.
-func (srv *Server) throw(event int, ctx interface{}) {
- srv.mul.Lock()
- lsn := srv.lsn
- srv.mul.Unlock()
+func (s *Server) throw(event int, ctx interface{}) {
+ s.mul.Lock()
+ defer s.mul.Unlock()
- if lsn != nil {
- lsn(event, ctx)
+ if s.lsn != nil {
+ s.lsn(event, ctx)
}
}
diff --git a/service/http/handler.go b/service/http/handler.go
index 8c604127..2d749207 100644
--- a/service/http/handler.go
+++ b/service/http/handler.go
@@ -113,10 +113,9 @@ func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error)
// throw invokes event srv if any.
func (h *Handler) throw(event int, ctx interface{}) {
h.mul.Lock()
- lsn := h.lsn
- h.mul.Unlock()
+ defer h.mul.Unlock()
- if lsn != nil {
- lsn(event, ctx)
+ if h.lsn != nil {
+ h.lsn(event, ctx)
}
}
diff --git a/static_pool.go b/static_pool.go
index 19fc1d13..80847f6e 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -273,10 +273,9 @@ func (p *StaticPool) destroying() bool {
// throw invokes event handler if any.
func (p *StaticPool) throw(event int, ctx interface{}) {
p.mul.Lock()
- lsn := p.lsn
- p.mul.Unlock()
+ defer p.mul.Unlock()
- if lsn != nil {
- lsn(event, ctx)
+ if p.lsn != nil {
+ p.lsn(event, ctx)
}
}