summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-06 15:55:28 +0300
committerWolfy-J <[email protected]>2018-06-06 15:55:28 +0300
commite863c6cdcf7c318fb251e096bf92812ed98ea03c (patch)
tree9eb2cd049dfae0482211a6a212d28cc48304f109
parent67b88c9914d922314ce7cd8d3624d64812647758 (diff)
better server re-configuration
-rw-r--r--cmd/_____/server.go64
-rw-r--r--config.go2
-rw-r--r--payload.go6
-rw-r--r--server.go204
-rw-r--r--server_config.go12
-rw-r--r--server_test.go1
-rw-r--r--service/registry.go4
7 files changed, 196 insertions, 97 deletions
diff --git a/cmd/_____/server.go b/cmd/_____/server.go
new file mode 100644
index 00000000..5542e7c9
--- /dev/null
+++ b/cmd/_____/server.go
@@ -0,0 +1,64 @@
+package roadrunner
+
+import (
+ "os/exec"
+ "sync"
+)
+
+const (
+ // EventNewPool triggered when server creates new pool.
+ EventNewPool = 60
+
+ // EventDestroyPool triggered when server destroys existed pool.
+ EventDestroyPool = 61
+)
+
+// Service manages pool creation and swapping.
+type Server struct {
+ // configures server, pool, cmd creation and factory.
+ scfg *ServerConfig
+
+ // worker command creator
+ cmd func() *exec.Cmd
+
+ // observes pool events (can be attached to multiple pools at the same time)
+ observer func(event int, ctx interface{})
+
+ // creates and connects to workers
+ factory Factory
+
+ // protects pool while the switch
+ mu sync.Mutex
+}
+
+// todo: do assignment
+
+// Reconfigure configures underlying pool and destroys it's previous version if any.
+func (r *Server) Configure(cfg Config) error {
+ r.mu.Lock()
+ previous := r.pool
+ r.mu.Unlock()
+
+ pool, err := NewPool(r.cmd, r.factory, cfg)
+ if err != nil {
+ return err
+ }
+
+ r.throw(EventNewPool, pool)
+
+ r.mu.Lock()
+
+ r.cfg, r.pool = cfg, pool
+ r.pool.Observe(r.poolObserver)
+
+ r.mu.Unlock()
+
+ if previous != nil {
+ go func(p Pool) {
+ r.throw(EventDestroyPool, p)
+ p.Destroy()
+ }(previous)
+ }
+
+ return nil
+}
diff --git a/config.go b/config.go
index 0ddaa14e..6813fd1e 100644
--- a/config.go
+++ b/config.go
@@ -25,7 +25,7 @@ type Config struct {
DestroyTimeout time.Duration
}
-// Configure returns error if cfg not valid
+// Reconfigure returns error if cfg not valid
func (cfg *Config) Valid() error {
if cfg.NumWorkers == 0 {
return fmt.Errorf("cfg.NumWorkers must be set")
diff --git a/payload.go b/payload.go
index 4cfcb68c..87472fa1 100644
--- a/payload.go
+++ b/payload.go
@@ -3,11 +3,13 @@ package roadrunner
// Payload carries binary header and body to workers and
// back to the server.
type Payload struct {
- // Context represent payload context, might be omitted
+ // Context represent payload context, might be omitted.
Context []byte
- // body contains binary payload to be processed by worker
+ // body contains binary payload to be processed by worker.
Body []byte
+
+ // todo: io.Reader support for streamed requests and responses.
}
// String returns payload body as string
diff --git a/server.go b/server.go
index 9a24cb39..08d96a6b 100644
--- a/server.go
+++ b/server.go
@@ -1,75 +1,88 @@
package roadrunner
import (
- "fmt"
- "os/exec"
"sync"
+ "os/exec"
+ "fmt"
+ "errors"
)
const (
// EventNewPool triggered when server creates new pool.
- EventNewPool = 60
+ EventStart = iota + 128
+
+ // EventNewPool triggered when server creates new pool.
+ EventStop
+
+ // EventNewPool triggered when server creates new pool.
+ EventNewPool
// EventDestroyPool triggered when server destroys existed pool.
- EventDestroyPool = 61
+ EventDestroyPool
)
// Service manages pool creation and swapping.
type Server struct {
- // worker command creator
- cmd func() *exec.Cmd
-
- // defines server wide configuration, behaviour and timeouts.
- config ServerConfig
+ // configures server, pool, cmd creation and factory.
+ cfg *ServerConfig
// observes pool events (can be attached to multiple pools at the same time)
observer func(event int, ctx interface{})
- // creates and connects to workers
- factory Factory
-
- // protects pool while the switch
+ // protects pool while the re-configuration
mu sync.Mutex
- // pool behaviour
- cfg Config
+ // indicates that server was started
+ started bool
+
+ // worker command creator
+ cmd func() *exec.Cmd
+
+ // creates and connects to workers
+ factory Factory
// currently active pool instance
pool Pool
}
// NewServer creates new router. Make sure to call configure before the usage.
-func NewServer(cmd func() *exec.Cmd, factory Factory) *Server {
- return &Server{
- cmd: cmd,
- factory: factory,
- }
+func NewServer(cfg *ServerConfig) *Server {
+ return &Server{cfg: cfg}
}
-// Configure configures underlying pool and destroys it's previous version if any.
-func (r *Server) Configure(cfg Config) error {
- r.mu.Lock()
- previous := r.pool
- r.mu.Unlock()
-
- pool, err := NewPool(r.cmd, r.factory, cfg)
- if err != nil {
- return err
+// Reconfigure re-configures underlying pool and destroys it's previous version if any.
+func (srv *Server) Reconfigure(cfg *ServerConfig) error {
+ srv.mu.Lock()
+ if !srv.started {
+ srv.cfg = cfg
+ return nil
}
+ srv.mu.Unlock()
- r.throw(EventNewPool, pool)
+ // we are not allowing factory or cmd changes while the server is running.
+ if srv.cfg.Differs(cfg) {
+ return errors.New("config change while running server (only pool config change is allowed)")
+ }
- r.mu.Lock()
+ srv.mu.Lock()
+ previous := srv.pool
+ srv.mu.Unlock()
- r.cfg, r.pool = cfg, pool
- r.pool.Observe(r.poolObserver)
+ pool, err := NewPool(srv.cmd, srv.factory, *cfg.Pool)
+ if err != nil {
+ return err
+ }
+ srv.throw(EventNewPool, pool)
- r.mu.Unlock()
+ srv.mu.Lock()
+ srv.cfg, srv.pool = cfg, pool
+ srv.pool.Observe(srv.poolObserver)
+ srv.mu.Unlock()
if previous != nil {
- go func(p Pool) {
- r.throw(EventDestroyPool, p)
- p.Destroy()
+ go func(previous Pool) {
+ srv.throw(EventDestroyPool, previous)
+ previous.Destroy()
}(previous)
}
@@ -77,93 +90,100 @@ func (r *Server) Configure(cfg Config) error {
}
// Reset resets the state of underlying pool and rebuilds all of it's workers.
-func (r *Server) Reset() error {
- return r.Configure(r.cfg)
+func (srv *Server) Reset() error {
+ return srv.Reconfigure(srv.cfg)
}
// Observe attaches event watcher to the router.
-func (r *Server) Observe(o func(event int, ctx interface{})) {
- r.observer = o
+func (srv *Server) Observe(o func(event int, ctx interface{})) {
+ srv.observer = o
}
-// Pool returns active pool or error.
-func (r *Server) Pool() Pool {
- r.mu.Lock()
- defer r.mu.Unlock()
-
- return r.pool
-}
+// Start underlying worker pool, configure factory and command provider.
+func (srv *Server) Start() (err error) {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
-// Exec one task with given payload and context, returns result or error.
-func (r *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
- pool := r.Pool()
- if pool == nil {
- return nil, fmt.Errorf("no associared pool")
+ if srv.cmd, err = srv.cfg.makeCommand(); err != nil {
+ return err
}
- return pool.Exec(rqs)
-}
+ if srv.factory, err = srv.cfg.makeFactory(); err != nil {
+ return err
+ }
-// Workers returns worker list associated with the pool.
-func (r *Server) Workers() (workers []*Worker) {
- pool := r.Pool()
- if pool == nil {
- return nil
+ if srv.pool, err = NewPool(srv.cmd, srv.factory, *srv.cfg.Pool); err != nil {
+ return err
}
- return pool.Workers()
+ srv.pool.Observe(srv.poolObserver)
+ srv.started = true
+ return nil
}
-// Destroy all underlying pools and workers workers (but let them to complete the task).
-func (r *Server) Destroy() {
- r.mu.Lock()
- defer r.mu.Unlock()
+// Stop underlying worker pool and close the factory.
+func (srv *Server) Stop() error {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
- if r.pool == nil {
- return
+ if !srv.started {
+ return nil
}
- go func(p Pool) {
- r.throw(EventDestroyPool, p)
- p.Destroy()
- }(r.pool)
+ srv.throw(EventDestroyPool, srv.pool)
+ srv.pool.Destroy()
+ srv.factory.Close()
+
+ srv.cmd = nil
+ srv.factory = nil
+ srv.pool = nil
+ srv.started = false
- r.pool = nil
+ return nil
}
-// Start the server underlying worker pool and factory.
-func (r *Server) Start() error {
- if r.factory != nil {
- //todo: already have started
- return nil
+// Exec one task with given payload and context, returns result or error.
+func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
+ pool := srv.Pool()
+ if pool == nil {
+ return nil, fmt.Errorf("no associared pool")
}
- return nil
+ return pool.Exec(rqs)
}
-// Stop the server and close underlying factory.
-func (r *Server) Stop() {
- r.mu.Lock()
- defer r.mu.Unlock()
+// Workers returns worker list associated with the server pool.
+func (srv *Server) Workers() (workers []*Worker) {
+ p := srv.Pool()
+ if p == nil {
+ return nil
+ }
- r.factory.Close()
- r.factory = nil
+ return p.Workers()
}
-// throw invokes event handler if any.
-func (r *Server) throw(event int, ctx interface{}) {
- if r.observer != nil {
- r.observer(event, ctx)
- }
+// Pool returns active pool or error.
+func (srv *Server) Pool() Pool {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
+
+ return srv.pool
}
// Observe pool events.
-func (r *Server) poolObserver(event int, ctx interface{}) {
+func (srv *Server) poolObserver(event int, ctx interface{}) {
// bypassing to user specified observer
- r.throw(event, ctx)
+ srv.throw(event, ctx)
if event == EventPoolError {
// pool failure, rebuilding
- r.Reset()
+ srv.Reset()
+ }
+}
+
+// throw invokes event handler if any.
+func (srv *Server) throw(event int, ctx interface{}) {
+ if srv.observer != nil {
+ srv.observer(event, ctx)
}
}
diff --git a/server_config.go b/server_config.go
index 72585e06..1491e833 100644
--- a/server_config.go
+++ b/server_config.go
@@ -44,6 +44,18 @@ type ServerConfig struct {
Pool *Config
}
+// Differs returns true if configuration has changed but ignores pool changes.
+func (cfg *ServerConfig) Differs(new *ServerConfig) bool {
+ // command configuration has changed
+ if cfg.Command != new.Command || cfg.User != new.User || cfg.Group != new.Group {
+ return true
+ }
+
+ // factory configuration has changed
+ return cfg.Relay != new.Relay || cfg.FactoryTimeout != new.FactoryTimeout
+}
+
+// makeCommands returns new command provider based on configured options.
func (cfg *ServerConfig) makeCommand() (func() *exec.Cmd, error) {
var (
err error
diff --git a/server_test.go b/server_test.go
new file mode 100644
index 00000000..3f283dce
--- /dev/null
+++ b/server_test.go
@@ -0,0 +1 @@
+package roadrunner
diff --git a/service/registry.go b/service/registry.go
index 659ece8b..e2ab0d81 100644
--- a/service/registry.go
+++ b/service/registry.go
@@ -22,7 +22,7 @@ type Registry interface {
// Register add new service to the registry under given name.
Register(name string, service Service)
- // Configure configures all underlying services with given configuration.
+ // Reconfigure configures all underlying services with given configuration.
Configure(cfg Config) error
// Check is Service has been registered and configured.
@@ -92,7 +92,7 @@ func (r *registry) Register(name string, service Service) {
r.log.Debugf("%s.service: registered", name)
}
-// Configure configures all underlying services with given configuration.
+// Reconfigure configures all underlying services with given configuration.
func (r *registry) Configure(cfg Config) error {
if r.configured != nil {
return fmt.Errorf("service bus has been already configured")