diff options
author | Wolfy-J <[email protected]> | 2018-06-06 15:55:28 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-06 15:55:28 +0300 |
commit | e863c6cdcf7c318fb251e096bf92812ed98ea03c (patch) | |
tree | 9eb2cd049dfae0482211a6a212d28cc48304f109 | |
parent | 67b88c9914d922314ce7cd8d3624d64812647758 (diff) |
better server re-configuration
-rw-r--r-- | cmd/_____/server.go | 64 | ||||
-rw-r--r-- | config.go | 2 | ||||
-rw-r--r-- | payload.go | 6 | ||||
-rw-r--r-- | server.go | 204 | ||||
-rw-r--r-- | server_config.go | 12 | ||||
-rw-r--r-- | server_test.go | 1 | ||||
-rw-r--r-- | service/registry.go | 4 |
7 files changed, 196 insertions, 97 deletions
diff --git a/cmd/_____/server.go b/cmd/_____/server.go new file mode 100644 index 00000000..5542e7c9 --- /dev/null +++ b/cmd/_____/server.go @@ -0,0 +1,64 @@ +package roadrunner + +import ( + "os/exec" + "sync" +) + +const ( + // EventNewPool triggered when server creates new pool. + EventNewPool = 60 + + // EventDestroyPool triggered when server destroys existed pool. + EventDestroyPool = 61 +) + +// Service manages pool creation and swapping. +type Server struct { + // configures server, pool, cmd creation and factory. + scfg *ServerConfig + + // worker command creator + cmd func() *exec.Cmd + + // observes pool events (can be attached to multiple pools at the same time) + observer func(event int, ctx interface{}) + + // creates and connects to workers + factory Factory + + // protects pool while the switch + mu sync.Mutex +} + +// todo: do assignment + +// Reconfigure configures underlying pool and destroys it's previous version if any. +func (r *Server) Configure(cfg Config) error { + r.mu.Lock() + previous := r.pool + r.mu.Unlock() + + pool, err := NewPool(r.cmd, r.factory, cfg) + if err != nil { + return err + } + + r.throw(EventNewPool, pool) + + r.mu.Lock() + + r.cfg, r.pool = cfg, pool + r.pool.Observe(r.poolObserver) + + r.mu.Unlock() + + if previous != nil { + go func(p Pool) { + r.throw(EventDestroyPool, p) + p.Destroy() + }(previous) + } + + return nil +} @@ -25,7 +25,7 @@ type Config struct { DestroyTimeout time.Duration } -// Configure returns error if cfg not valid +// Reconfigure returns error if cfg not valid func (cfg *Config) Valid() error { if cfg.NumWorkers == 0 { return fmt.Errorf("cfg.NumWorkers must be set") @@ -3,11 +3,13 @@ package roadrunner // Payload carries binary header and body to workers and // back to the server. type Payload struct { - // Context represent payload context, might be omitted + // Context represent payload context, might be omitted. Context []byte - // body contains binary payload to be processed by worker + // body contains binary payload to be processed by worker. Body []byte + + // todo: io.Reader support for streamed requests and responses. } // String returns payload body as string @@ -1,75 +1,88 @@ package roadrunner import ( - "fmt" - "os/exec" "sync" + "os/exec" + "fmt" + "errors" ) const ( // EventNewPool triggered when server creates new pool. - EventNewPool = 60 + EventStart = iota + 128 + + // EventNewPool triggered when server creates new pool. + EventStop + + // EventNewPool triggered when server creates new pool. + EventNewPool // EventDestroyPool triggered when server destroys existed pool. - EventDestroyPool = 61 + EventDestroyPool ) // Service manages pool creation and swapping. type Server struct { - // worker command creator - cmd func() *exec.Cmd - - // defines server wide configuration, behaviour and timeouts. - config ServerConfig + // configures server, pool, cmd creation and factory. + cfg *ServerConfig // observes pool events (can be attached to multiple pools at the same time) observer func(event int, ctx interface{}) - // creates and connects to workers - factory Factory - - // protects pool while the switch + // protects pool while the re-configuration mu sync.Mutex - // pool behaviour - cfg Config + // indicates that server was started + started bool + + // worker command creator + cmd func() *exec.Cmd + + // creates and connects to workers + factory Factory // currently active pool instance pool Pool } // NewServer creates new router. Make sure to call configure before the usage. -func NewServer(cmd func() *exec.Cmd, factory Factory) *Server { - return &Server{ - cmd: cmd, - factory: factory, - } +func NewServer(cfg *ServerConfig) *Server { + return &Server{cfg: cfg} } -// Configure configures underlying pool and destroys it's previous version if any. -func (r *Server) Configure(cfg Config) error { - r.mu.Lock() - previous := r.pool - r.mu.Unlock() - - pool, err := NewPool(r.cmd, r.factory, cfg) - if err != nil { - return err +// Reconfigure re-configures underlying pool and destroys it's previous version if any. +func (srv *Server) Reconfigure(cfg *ServerConfig) error { + srv.mu.Lock() + if !srv.started { + srv.cfg = cfg + return nil } + srv.mu.Unlock() - r.throw(EventNewPool, pool) + // we are not allowing factory or cmd changes while the server is running. + if srv.cfg.Differs(cfg) { + return errors.New("config change while running server (only pool config change is allowed)") + } - r.mu.Lock() + srv.mu.Lock() + previous := srv.pool + srv.mu.Unlock() - r.cfg, r.pool = cfg, pool - r.pool.Observe(r.poolObserver) + pool, err := NewPool(srv.cmd, srv.factory, *cfg.Pool) + if err != nil { + return err + } + srv.throw(EventNewPool, pool) - r.mu.Unlock() + srv.mu.Lock() + srv.cfg, srv.pool = cfg, pool + srv.pool.Observe(srv.poolObserver) + srv.mu.Unlock() if previous != nil { - go func(p Pool) { - r.throw(EventDestroyPool, p) - p.Destroy() + go func(previous Pool) { + srv.throw(EventDestroyPool, previous) + previous.Destroy() }(previous) } @@ -77,93 +90,100 @@ func (r *Server) Configure(cfg Config) error { } // Reset resets the state of underlying pool and rebuilds all of it's workers. -func (r *Server) Reset() error { - return r.Configure(r.cfg) +func (srv *Server) Reset() error { + return srv.Reconfigure(srv.cfg) } // Observe attaches event watcher to the router. -func (r *Server) Observe(o func(event int, ctx interface{})) { - r.observer = o +func (srv *Server) Observe(o func(event int, ctx interface{})) { + srv.observer = o } -// Pool returns active pool or error. -func (r *Server) Pool() Pool { - r.mu.Lock() - defer r.mu.Unlock() - - return r.pool -} +// Start underlying worker pool, configure factory and command provider. +func (srv *Server) Start() (err error) { + srv.mu.Lock() + defer srv.mu.Unlock() -// Exec one task with given payload and context, returns result or error. -func (r *Server) Exec(rqs *Payload) (rsp *Payload, err error) { - pool := r.Pool() - if pool == nil { - return nil, fmt.Errorf("no associared pool") + if srv.cmd, err = srv.cfg.makeCommand(); err != nil { + return err } - return pool.Exec(rqs) -} + if srv.factory, err = srv.cfg.makeFactory(); err != nil { + return err + } -// Workers returns worker list associated with the pool. -func (r *Server) Workers() (workers []*Worker) { - pool := r.Pool() - if pool == nil { - return nil + if srv.pool, err = NewPool(srv.cmd, srv.factory, *srv.cfg.Pool); err != nil { + return err } - return pool.Workers() + srv.pool.Observe(srv.poolObserver) + srv.started = true + return nil } -// Destroy all underlying pools and workers workers (but let them to complete the task). -func (r *Server) Destroy() { - r.mu.Lock() - defer r.mu.Unlock() +// Stop underlying worker pool and close the factory. +func (srv *Server) Stop() error { + srv.mu.Lock() + defer srv.mu.Unlock() - if r.pool == nil { - return + if !srv.started { + return nil } - go func(p Pool) { - r.throw(EventDestroyPool, p) - p.Destroy() - }(r.pool) + srv.throw(EventDestroyPool, srv.pool) + srv.pool.Destroy() + srv.factory.Close() + + srv.cmd = nil + srv.factory = nil + srv.pool = nil + srv.started = false - r.pool = nil + return nil } -// Start the server underlying worker pool and factory. -func (r *Server) Start() error { - if r.factory != nil { - //todo: already have started - return nil +// Exec one task with given payload and context, returns result or error. +func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) { + pool := srv.Pool() + if pool == nil { + return nil, fmt.Errorf("no associared pool") } - return nil + return pool.Exec(rqs) } -// Stop the server and close underlying factory. -func (r *Server) Stop() { - r.mu.Lock() - defer r.mu.Unlock() +// Workers returns worker list associated with the server pool. +func (srv *Server) Workers() (workers []*Worker) { + p := srv.Pool() + if p == nil { + return nil + } - r.factory.Close() - r.factory = nil + return p.Workers() } -// throw invokes event handler if any. -func (r *Server) throw(event int, ctx interface{}) { - if r.observer != nil { - r.observer(event, ctx) - } +// Pool returns active pool or error. +func (srv *Server) Pool() Pool { + srv.mu.Lock() + defer srv.mu.Unlock() + + return srv.pool } // Observe pool events. -func (r *Server) poolObserver(event int, ctx interface{}) { +func (srv *Server) poolObserver(event int, ctx interface{}) { // bypassing to user specified observer - r.throw(event, ctx) + srv.throw(event, ctx) if event == EventPoolError { // pool failure, rebuilding - r.Reset() + srv.Reset() + } +} + +// throw invokes event handler if any. +func (srv *Server) throw(event int, ctx interface{}) { + if srv.observer != nil { + srv.observer(event, ctx) } } diff --git a/server_config.go b/server_config.go index 72585e06..1491e833 100644 --- a/server_config.go +++ b/server_config.go @@ -44,6 +44,18 @@ type ServerConfig struct { Pool *Config } +// Differs returns true if configuration has changed but ignores pool changes. +func (cfg *ServerConfig) Differs(new *ServerConfig) bool { + // command configuration has changed + if cfg.Command != new.Command || cfg.User != new.User || cfg.Group != new.Group { + return true + } + + // factory configuration has changed + return cfg.Relay != new.Relay || cfg.FactoryTimeout != new.FactoryTimeout +} + +// makeCommands returns new command provider based on configured options. func (cfg *ServerConfig) makeCommand() (func() *exec.Cmd, error) { var ( err error diff --git a/server_test.go b/server_test.go new file mode 100644 index 00000000..3f283dce --- /dev/null +++ b/server_test.go @@ -0,0 +1 @@ +package roadrunner diff --git a/service/registry.go b/service/registry.go index 659ece8b..e2ab0d81 100644 --- a/service/registry.go +++ b/service/registry.go @@ -22,7 +22,7 @@ type Registry interface { // Register add new service to the registry under given name. Register(name string, service Service) - // Configure configures all underlying services with given configuration. + // Reconfigure configures all underlying services with given configuration. Configure(cfg Config) error // Check is Service has been registered and configured. @@ -92,7 +92,7 @@ func (r *registry) Register(name string, service Service) { r.log.Debugf("%s.service: registered", name) } -// Configure configures all underlying services with given configuration. +// Reconfigure configures all underlying services with given configuration. func (r *registry) Configure(cfg Config) error { if r.configured != nil { return fmt.Errorf("service bus has been already configured") |