diff options
author | Wolfy-J <[email protected]> | 2018-06-06 15:55:28 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-06 15:55:28 +0300 |
commit | e863c6cdcf7c318fb251e096bf92812ed98ea03c (patch) | |
tree | 9eb2cd049dfae0482211a6a212d28cc48304f109 /server.go | |
parent | 67b88c9914d922314ce7cd8d3624d64812647758 (diff) |
better server re-configuration
Diffstat (limited to 'server.go')
-rw-r--r-- | server.go | 204 |
1 files changed, 112 insertions, 92 deletions
@@ -1,75 +1,88 @@ package roadrunner import ( - "fmt" - "os/exec" "sync" + "os/exec" + "fmt" + "errors" ) const ( // EventNewPool triggered when server creates new pool. - EventNewPool = 60 + EventStart = iota + 128 + + // EventNewPool triggered when server creates new pool. + EventStop + + // EventNewPool triggered when server creates new pool. + EventNewPool // EventDestroyPool triggered when server destroys existed pool. - EventDestroyPool = 61 + EventDestroyPool ) // Service manages pool creation and swapping. type Server struct { - // worker command creator - cmd func() *exec.Cmd - - // defines server wide configuration, behaviour and timeouts. - config ServerConfig + // configures server, pool, cmd creation and factory. + cfg *ServerConfig // observes pool events (can be attached to multiple pools at the same time) observer func(event int, ctx interface{}) - // creates and connects to workers - factory Factory - - // protects pool while the switch + // protects pool while the re-configuration mu sync.Mutex - // pool behaviour - cfg Config + // indicates that server was started + started bool + + // worker command creator + cmd func() *exec.Cmd + + // creates and connects to workers + factory Factory // currently active pool instance pool Pool } // NewServer creates new router. Make sure to call configure before the usage. -func NewServer(cmd func() *exec.Cmd, factory Factory) *Server { - return &Server{ - cmd: cmd, - factory: factory, - } +func NewServer(cfg *ServerConfig) *Server { + return &Server{cfg: cfg} } -// Configure configures underlying pool and destroys it's previous version if any. -func (r *Server) Configure(cfg Config) error { - r.mu.Lock() - previous := r.pool - r.mu.Unlock() - - pool, err := NewPool(r.cmd, r.factory, cfg) - if err != nil { - return err +// Reconfigure re-configures underlying pool and destroys it's previous version if any. +func (srv *Server) Reconfigure(cfg *ServerConfig) error { + srv.mu.Lock() + if !srv.started { + srv.cfg = cfg + return nil } + srv.mu.Unlock() - r.throw(EventNewPool, pool) + // we are not allowing factory or cmd changes while the server is running. + if srv.cfg.Differs(cfg) { + return errors.New("config change while running server (only pool config change is allowed)") + } - r.mu.Lock() + srv.mu.Lock() + previous := srv.pool + srv.mu.Unlock() - r.cfg, r.pool = cfg, pool - r.pool.Observe(r.poolObserver) + pool, err := NewPool(srv.cmd, srv.factory, *cfg.Pool) + if err != nil { + return err + } + srv.throw(EventNewPool, pool) - r.mu.Unlock() + srv.mu.Lock() + srv.cfg, srv.pool = cfg, pool + srv.pool.Observe(srv.poolObserver) + srv.mu.Unlock() if previous != nil { - go func(p Pool) { - r.throw(EventDestroyPool, p) - p.Destroy() + go func(previous Pool) { + srv.throw(EventDestroyPool, previous) + previous.Destroy() }(previous) } @@ -77,93 +90,100 @@ func (r *Server) Configure(cfg Config) error { } // Reset resets the state of underlying pool and rebuilds all of it's workers. -func (r *Server) Reset() error { - return r.Configure(r.cfg) +func (srv *Server) Reset() error { + return srv.Reconfigure(srv.cfg) } // Observe attaches event watcher to the router. -func (r *Server) Observe(o func(event int, ctx interface{})) { - r.observer = o +func (srv *Server) Observe(o func(event int, ctx interface{})) { + srv.observer = o } -// Pool returns active pool or error. -func (r *Server) Pool() Pool { - r.mu.Lock() - defer r.mu.Unlock() - - return r.pool -} +// Start underlying worker pool, configure factory and command provider. +func (srv *Server) Start() (err error) { + srv.mu.Lock() + defer srv.mu.Unlock() -// Exec one task with given payload and context, returns result or error. -func (r *Server) Exec(rqs *Payload) (rsp *Payload, err error) { - pool := r.Pool() - if pool == nil { - return nil, fmt.Errorf("no associared pool") + if srv.cmd, err = srv.cfg.makeCommand(); err != nil { + return err } - return pool.Exec(rqs) -} + if srv.factory, err = srv.cfg.makeFactory(); err != nil { + return err + } -// Workers returns worker list associated with the pool. -func (r *Server) Workers() (workers []*Worker) { - pool := r.Pool() - if pool == nil { - return nil + if srv.pool, err = NewPool(srv.cmd, srv.factory, *srv.cfg.Pool); err != nil { + return err } - return pool.Workers() + srv.pool.Observe(srv.poolObserver) + srv.started = true + return nil } -// Destroy all underlying pools and workers workers (but let them to complete the task). -func (r *Server) Destroy() { - r.mu.Lock() - defer r.mu.Unlock() +// Stop underlying worker pool and close the factory. +func (srv *Server) Stop() error { + srv.mu.Lock() + defer srv.mu.Unlock() - if r.pool == nil { - return + if !srv.started { + return nil } - go func(p Pool) { - r.throw(EventDestroyPool, p) - p.Destroy() - }(r.pool) + srv.throw(EventDestroyPool, srv.pool) + srv.pool.Destroy() + srv.factory.Close() + + srv.cmd = nil + srv.factory = nil + srv.pool = nil + srv.started = false - r.pool = nil + return nil } -// Start the server underlying worker pool and factory. -func (r *Server) Start() error { - if r.factory != nil { - //todo: already have started - return nil +// Exec one task with given payload and context, returns result or error. +func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) { + pool := srv.Pool() + if pool == nil { + return nil, fmt.Errorf("no associared pool") } - return nil + return pool.Exec(rqs) } -// Stop the server and close underlying factory. -func (r *Server) Stop() { - r.mu.Lock() - defer r.mu.Unlock() +// Workers returns worker list associated with the server pool. +func (srv *Server) Workers() (workers []*Worker) { + p := srv.Pool() + if p == nil { + return nil + } - r.factory.Close() - r.factory = nil + return p.Workers() } -// throw invokes event handler if any. -func (r *Server) throw(event int, ctx interface{}) { - if r.observer != nil { - r.observer(event, ctx) - } +// Pool returns active pool or error. +func (srv *Server) Pool() Pool { + srv.mu.Lock() + defer srv.mu.Unlock() + + return srv.pool } // Observe pool events. -func (r *Server) poolObserver(event int, ctx interface{}) { +func (srv *Server) poolObserver(event int, ctx interface{}) { // bypassing to user specified observer - r.throw(event, ctx) + srv.throw(event, ctx) if event == EventPoolError { // pool failure, rebuilding - r.Reset() + srv.Reset() + } +} + +// throw invokes event handler if any. +func (srv *Server) throw(event int, ctx interface{}) { + if srv.observer != nil { + srv.observer(event, ctx) } } |