diff options
-rw-r--r-- | server.go | 135 | ||||
-rw-r--r-- | service/http/handler.go | 7 | ||||
-rw-r--r-- | static_pool.go | 7 |
3 files changed, 73 insertions, 76 deletions
@@ -51,55 +51,55 @@ func NewServer(cfg *ServerConfig) *Server { } // AddListener attaches server event watcher. -func (srv *Server) Listen(l func(event int, ctx interface{})) { - srv.mul.Lock() - defer srv.mul.Unlock() +func (s *Server) Listen(l func(event int, ctx interface{})) { + s.mul.Lock() + defer s.mul.Unlock() - srv.lsn = l + s.lsn = l } // Start underlying worker pool, configure factory and command provider. -func (srv *Server) Start() (err error) { - srv.mu.Lock() - defer srv.mu.Unlock() +func (s *Server) Start() (err error) { + s.mu.Lock() + defer s.mu.Unlock() - if srv.factory, err = srv.cfg.makeFactory(); err != nil { + if s.factory, err = s.cfg.makeFactory(); err != nil { return err } - if srv.pool, err = NewPool(srv.cfg.makeCommand(), srv.factory, *srv.cfg.Pool); err != nil { + if s.pool, err = NewPool(s.cfg.makeCommand(), s.factory, *s.cfg.Pool); err != nil { return err } - srv.pool.Listen(srv.poolListener) - srv.started = true - srv.throw(EventServerStart, srv) + s.pool.Listen(s.poolListener) + s.started = true + s.throw(EventServerStart, s) return nil } // Stop underlying worker pool and close the factory. -func (srv *Server) Stop() { - srv.mu.Lock() - defer srv.mu.Unlock() +func (s *Server) Stop() { + s.mu.Lock() + defer s.mu.Unlock() - if !srv.started { + if !s.started { return } - srv.throw(EventPoolDestruct, srv.pool) - srv.pool.Destroy() - srv.factory.Close() + s.throw(EventPoolDestruct, s.pool) + s.pool.Destroy() + s.factory.Close() - srv.factory = nil - srv.pool = nil - srv.started = false - srv.throw(EventServerStop, srv) + s.factory = nil + s.pool = nil + s.started = false + s.throw(EventServerStop, s) } // Exec one task with given payload and context, returns result or error. -func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) { - pool := srv.Pool() +func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error) { + pool := s.Pool() if pool == nil { return nil, fmt.Errorf("no associared pool") } @@ -109,38 +109,38 @@ func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) { // Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory // and relay settings. -func (srv *Server) Reconfigure(cfg *ServerConfig) error { - srv.mu.Lock() - if !srv.started { - srv.cfg = cfg - srv.mu.Unlock() +func (s *Server) Reconfigure(cfg *ServerConfig) error { + s.mu.Lock() + if !s.started { + s.cfg = cfg + s.mu.Unlock() return nil } - srv.mu.Unlock() + s.mu.Unlock() - if srv.cfg.Differs(cfg) { + if s.cfg.Differs(cfg) { return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") } - srv.mu.Lock() - previous := srv.pool - srv.mu.Unlock() + s.mu.Lock() + previous := s.pool + s.mu.Unlock() - pool, err := NewPool(cfg.makeCommand(), srv.factory, *cfg.Pool) - srv.pool.Listen(srv.poolListener) + pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) + s.pool.Listen(s.poolListener) if err != nil { return err } - srv.mu.Lock() - srv.cfg.Pool, srv.pool = cfg.Pool, pool - srv.mu.Unlock() + s.mu.Lock() + s.cfg.Pool, s.pool = cfg.Pool, pool + s.mu.Unlock() - srv.throw(EventPoolConstruct, pool) + s.throw(EventPoolConstruct, pool) if previous != nil { go func(previous Pool) { - srv.throw(EventPoolDestruct, previous) + s.throw(EventPoolDestruct, previous) previous.Destroy() }(previous) } @@ -149,17 +149,17 @@ func (srv *Server) Reconfigure(cfg *ServerConfig) error { } // Reset resets the state of underlying pool and rebuilds all of it's workers. -func (srv *Server) Reset() error { - srv.mu.Lock() - cfg := srv.cfg - srv.mu.Unlock() +func (s *Server) Reset() error { + s.mu.Lock() + cfg := s.cfg + s.mu.Unlock() - return srv.Reconfigure(cfg) + return s.Reconfigure(cfg) } // Workers returns worker list associated with the server pool. -func (srv *Server) Workers() (workers []*Worker) { - p := srv.Pool() +func (s *Server) Workers() (workers []*Worker) { + p := s.Pool() if p == nil { return nil } @@ -168,41 +168,40 @@ func (srv *Server) Workers() (workers []*Worker) { } // Pool returns active pool or error. -func (srv *Server) Pool() Pool { - srv.mu.Lock() - defer srv.mu.Unlock() +func (s *Server) Pool() Pool { + s.mu.Lock() + defer s.mu.Unlock() - return srv.pool + return s.pool } // AddListener pool events. -func (srv *Server) poolListener(event int, ctx interface{}) { +func (s *Server) poolListener(event int, ctx interface{}) { // bypassing to user specified lsn - srv.throw(event, ctx) + s.throw(event, ctx) if event == EventPoolError { // pool failure, rebuilding - if err := srv.Reset(); err != nil { - srv.mu.Lock() - defer srv.mu.Unlock() + if err := s.Reset(); err != nil { + s.mu.Lock() + defer s.mu.Unlock() - srv.started = false - srv.pool = nil - srv.factory = nil + s.started = false + s.pool = nil + s.factory = nil // everything is dead, this is recoverable but heavy state - srv.throw(EventServerFailure, err) + s.throw(EventServerFailure, err) } } } // throw invokes event handler if any. -func (srv *Server) throw(event int, ctx interface{}) { - srv.mul.Lock() - lsn := srv.lsn - srv.mul.Unlock() +func (s *Server) throw(event int, ctx interface{}) { + s.mul.Lock() + defer s.mul.Unlock() - if lsn != nil { - lsn(event, ctx) + if s.lsn != nil { + s.lsn(event, ctx) } } diff --git a/service/http/handler.go b/service/http/handler.go index 8c604127..2d749207 100644 --- a/service/http/handler.go +++ b/service/http/handler.go @@ -113,10 +113,9 @@ func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) // throw invokes event srv if any. func (h *Handler) throw(event int, ctx interface{}) { h.mul.Lock() - lsn := h.lsn - h.mul.Unlock() + defer h.mul.Unlock() - if lsn != nil { - lsn(event, ctx) + if h.lsn != nil { + h.lsn(event, ctx) } } diff --git a/static_pool.go b/static_pool.go index 19fc1d13..80847f6e 100644 --- a/static_pool.go +++ b/static_pool.go @@ -273,10 +273,9 @@ func (p *StaticPool) destroying() bool { // throw invokes event handler if any. func (p *StaticPool) throw(event int, ctx interface{}) { p.mul.Lock() - lsn := p.lsn - p.mul.Unlock() + defer p.mul.Unlock() - if lsn != nil { - lsn(event, ctx) + if p.lsn != nil { + p.lsn(event, ctx) } } |