From d2c7ee8c0a070b9790d5552d3f607ca01e1ab798 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Wed, 13 Jun 2018 13:02:38 +0300 Subject: tests! --- server.go | 135 +++++++++++++++++++++++++++++++------------------------------- 1 file changed, 67 insertions(+), 68 deletions(-) (limited to 'server.go') diff --git a/server.go b/server.go index b46e65d5..45b4597c 100644 --- a/server.go +++ b/server.go @@ -51,55 +51,55 @@ func NewServer(cfg *ServerConfig) *Server { } // AddListener attaches server event watcher. -func (srv *Server) Listen(l func(event int, ctx interface{})) { - srv.mul.Lock() - defer srv.mul.Unlock() +func (s *Server) Listen(l func(event int, ctx interface{})) { + s.mul.Lock() + defer s.mul.Unlock() - srv.lsn = l + s.lsn = l } // Start underlying worker pool, configure factory and command provider. -func (srv *Server) Start() (err error) { - srv.mu.Lock() - defer srv.mu.Unlock() +func (s *Server) Start() (err error) { + s.mu.Lock() + defer s.mu.Unlock() - if srv.factory, err = srv.cfg.makeFactory(); err != nil { + if s.factory, err = s.cfg.makeFactory(); err != nil { return err } - if srv.pool, err = NewPool(srv.cfg.makeCommand(), srv.factory, *srv.cfg.Pool); err != nil { + if s.pool, err = NewPool(s.cfg.makeCommand(), s.factory, *s.cfg.Pool); err != nil { return err } - srv.pool.Listen(srv.poolListener) - srv.started = true - srv.throw(EventServerStart, srv) + s.pool.Listen(s.poolListener) + s.started = true + s.throw(EventServerStart, s) return nil } // Stop underlying worker pool and close the factory. -func (srv *Server) Stop() { - srv.mu.Lock() - defer srv.mu.Unlock() +func (s *Server) Stop() { + s.mu.Lock() + defer s.mu.Unlock() - if !srv.started { + if !s.started { return } - srv.throw(EventPoolDestruct, srv.pool) - srv.pool.Destroy() - srv.factory.Close() + s.throw(EventPoolDestruct, s.pool) + s.pool.Destroy() + s.factory.Close() - srv.factory = nil - srv.pool = nil - srv.started = false - srv.throw(EventServerStop, srv) + s.factory = nil + s.pool = nil + s.started = false + s.throw(EventServerStop, s) } // Exec one task with given payload and context, returns result or error. -func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) { - pool := srv.Pool() +func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error) { + pool := s.Pool() if pool == nil { return nil, fmt.Errorf("no associared pool") } @@ -109,38 +109,38 @@ func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) { // Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory // and relay settings. -func (srv *Server) Reconfigure(cfg *ServerConfig) error { - srv.mu.Lock() - if !srv.started { - srv.cfg = cfg - srv.mu.Unlock() +func (s *Server) Reconfigure(cfg *ServerConfig) error { + s.mu.Lock() + if !s.started { + s.cfg = cfg + s.mu.Unlock() return nil } - srv.mu.Unlock() + s.mu.Unlock() - if srv.cfg.Differs(cfg) { + if s.cfg.Differs(cfg) { return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") } - srv.mu.Lock() - previous := srv.pool - srv.mu.Unlock() + s.mu.Lock() + previous := s.pool + s.mu.Unlock() - pool, err := NewPool(cfg.makeCommand(), srv.factory, *cfg.Pool) - srv.pool.Listen(srv.poolListener) + pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) + s.pool.Listen(s.poolListener) if err != nil { return err } - srv.mu.Lock() - srv.cfg.Pool, srv.pool = cfg.Pool, pool - srv.mu.Unlock() + s.mu.Lock() + s.cfg.Pool, s.pool = cfg.Pool, pool + s.mu.Unlock() - srv.throw(EventPoolConstruct, pool) + s.throw(EventPoolConstruct, pool) if previous != nil { go func(previous Pool) { - srv.throw(EventPoolDestruct, previous) + s.throw(EventPoolDestruct, previous) previous.Destroy() }(previous) } @@ -149,17 +149,17 @@ func (srv *Server) Reconfigure(cfg *ServerConfig) error { } // Reset resets the state of underlying pool and rebuilds all of it's workers. -func (srv *Server) Reset() error { - srv.mu.Lock() - cfg := srv.cfg - srv.mu.Unlock() +func (s *Server) Reset() error { + s.mu.Lock() + cfg := s.cfg + s.mu.Unlock() - return srv.Reconfigure(cfg) + return s.Reconfigure(cfg) } // Workers returns worker list associated with the server pool. -func (srv *Server) Workers() (workers []*Worker) { - p := srv.Pool() +func (s *Server) Workers() (workers []*Worker) { + p := s.Pool() if p == nil { return nil } @@ -168,41 +168,40 @@ func (srv *Server) Workers() (workers []*Worker) { } // Pool returns active pool or error. -func (srv *Server) Pool() Pool { - srv.mu.Lock() - defer srv.mu.Unlock() +func (s *Server) Pool() Pool { + s.mu.Lock() + defer s.mu.Unlock() - return srv.pool + return s.pool } // AddListener pool events. -func (srv *Server) poolListener(event int, ctx interface{}) { +func (s *Server) poolListener(event int, ctx interface{}) { // bypassing to user specified lsn - srv.throw(event, ctx) + s.throw(event, ctx) if event == EventPoolError { // pool failure, rebuilding - if err := srv.Reset(); err != nil { - srv.mu.Lock() - defer srv.mu.Unlock() + if err := s.Reset(); err != nil { + s.mu.Lock() + defer s.mu.Unlock() - srv.started = false - srv.pool = nil - srv.factory = nil + s.started = false + s.pool = nil + s.factory = nil // everything is dead, this is recoverable but heavy state - srv.throw(EventServerFailure, err) + s.throw(EventServerFailure, err) } } } // throw invokes event handler if any. -func (srv *Server) throw(event int, ctx interface{}) { - srv.mul.Lock() - lsn := srv.lsn - srv.mul.Unlock() +func (s *Server) throw(event int, ctx interface{}) { + s.mul.Lock() + defer s.mul.Unlock() - if lsn != nil { - lsn(event, ctx) + if s.lsn != nil { + s.lsn(event, ctx) } } -- cgit v1.2.3