summaryrefslogtreecommitdiff
path: root/server.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-06 15:55:28 +0300
committerWolfy-J <[email protected]>2018-06-06 15:55:28 +0300
commite863c6cdcf7c318fb251e096bf92812ed98ea03c (patch)
tree9eb2cd049dfae0482211a6a212d28cc48304f109 /server.go
parent67b88c9914d922314ce7cd8d3624d64812647758 (diff)
better server re-configuration
Diffstat (limited to 'server.go')
-rw-r--r--server.go204
1 files changed, 112 insertions, 92 deletions
diff --git a/server.go b/server.go
index 9a24cb39..08d96a6b 100644
--- a/server.go
+++ b/server.go
@@ -1,75 +1,88 @@
package roadrunner
import (
- "fmt"
- "os/exec"
"sync"
+ "os/exec"
+ "fmt"
+ "errors"
)
const (
// EventNewPool triggered when server creates new pool.
- EventNewPool = 60
+ EventStart = iota + 128
+
+ // EventNewPool triggered when server creates new pool.
+ EventStop
+
+ // EventNewPool triggered when server creates new pool.
+ EventNewPool
// EventDestroyPool triggered when server destroys existed pool.
- EventDestroyPool = 61
+ EventDestroyPool
)
// Service manages pool creation and swapping.
type Server struct {
- // worker command creator
- cmd func() *exec.Cmd
-
- // defines server wide configuration, behaviour and timeouts.
- config ServerConfig
+ // configures server, pool, cmd creation and factory.
+ cfg *ServerConfig
// observes pool events (can be attached to multiple pools at the same time)
observer func(event int, ctx interface{})
- // creates and connects to workers
- factory Factory
-
- // protects pool while the switch
+ // protects pool while the re-configuration
mu sync.Mutex
- // pool behaviour
- cfg Config
+ // indicates that server was started
+ started bool
+
+ // worker command creator
+ cmd func() *exec.Cmd
+
+ // creates and connects to workers
+ factory Factory
// currently active pool instance
pool Pool
}
// NewServer creates new router. Make sure to call configure before the usage.
-func NewServer(cmd func() *exec.Cmd, factory Factory) *Server {
- return &Server{
- cmd: cmd,
- factory: factory,
- }
+func NewServer(cfg *ServerConfig) *Server {
+ return &Server{cfg: cfg}
}
-// Configure configures underlying pool and destroys it's previous version if any.
-func (r *Server) Configure(cfg Config) error {
- r.mu.Lock()
- previous := r.pool
- r.mu.Unlock()
-
- pool, err := NewPool(r.cmd, r.factory, cfg)
- if err != nil {
- return err
+// Reconfigure re-configures underlying pool and destroys it's previous version if any.
+func (srv *Server) Reconfigure(cfg *ServerConfig) error {
+ srv.mu.Lock()
+ if !srv.started {
+ srv.cfg = cfg
+ return nil
}
+ srv.mu.Unlock()
- r.throw(EventNewPool, pool)
+ // we are not allowing factory or cmd changes while the server is running.
+ if srv.cfg.Differs(cfg) {
+ return errors.New("config change while running server (only pool config change is allowed)")
+ }
- r.mu.Lock()
+ srv.mu.Lock()
+ previous := srv.pool
+ srv.mu.Unlock()
- r.cfg, r.pool = cfg, pool
- r.pool.Observe(r.poolObserver)
+ pool, err := NewPool(srv.cmd, srv.factory, *cfg.Pool)
+ if err != nil {
+ return err
+ }
+ srv.throw(EventNewPool, pool)
- r.mu.Unlock()
+ srv.mu.Lock()
+ srv.cfg, srv.pool = cfg, pool
+ srv.pool.Observe(srv.poolObserver)
+ srv.mu.Unlock()
if previous != nil {
- go func(p Pool) {
- r.throw(EventDestroyPool, p)
- p.Destroy()
+ go func(previous Pool) {
+ srv.throw(EventDestroyPool, previous)
+ previous.Destroy()
}(previous)
}
@@ -77,93 +90,100 @@ func (r *Server) Configure(cfg Config) error {
}
// Reset resets the state of underlying pool and rebuilds all of it's workers.
-func (r *Server) Reset() error {
- return r.Configure(r.cfg)
+func (srv *Server) Reset() error {
+ return srv.Reconfigure(srv.cfg)
}
// Observe attaches event watcher to the router.
-func (r *Server) Observe(o func(event int, ctx interface{})) {
- r.observer = o
+func (srv *Server) Observe(o func(event int, ctx interface{})) {
+ srv.observer = o
}
-// Pool returns active pool or error.
-func (r *Server) Pool() Pool {
- r.mu.Lock()
- defer r.mu.Unlock()
-
- return r.pool
-}
+// Start underlying worker pool, configure factory and command provider.
+func (srv *Server) Start() (err error) {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
-// Exec one task with given payload and context, returns result or error.
-func (r *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
- pool := r.Pool()
- if pool == nil {
- return nil, fmt.Errorf("no associared pool")
+ if srv.cmd, err = srv.cfg.makeCommand(); err != nil {
+ return err
}
- return pool.Exec(rqs)
-}
+ if srv.factory, err = srv.cfg.makeFactory(); err != nil {
+ return err
+ }
-// Workers returns worker list associated with the pool.
-func (r *Server) Workers() (workers []*Worker) {
- pool := r.Pool()
- if pool == nil {
- return nil
+ if srv.pool, err = NewPool(srv.cmd, srv.factory, *srv.cfg.Pool); err != nil {
+ return err
}
- return pool.Workers()
+ srv.pool.Observe(srv.poolObserver)
+ srv.started = true
+ return nil
}
-// Destroy all underlying pools and workers workers (but let them to complete the task).
-func (r *Server) Destroy() {
- r.mu.Lock()
- defer r.mu.Unlock()
+// Stop underlying worker pool and close the factory.
+func (srv *Server) Stop() error {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
- if r.pool == nil {
- return
+ if !srv.started {
+ return nil
}
- go func(p Pool) {
- r.throw(EventDestroyPool, p)
- p.Destroy()
- }(r.pool)
+ srv.throw(EventDestroyPool, srv.pool)
+ srv.pool.Destroy()
+ srv.factory.Close()
+
+ srv.cmd = nil
+ srv.factory = nil
+ srv.pool = nil
+ srv.started = false
- r.pool = nil
+ return nil
}
-// Start the server underlying worker pool and factory.
-func (r *Server) Start() error {
- if r.factory != nil {
- //todo: already have started
- return nil
+// Exec one task with given payload and context, returns result or error.
+func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
+ pool := srv.Pool()
+ if pool == nil {
+ return nil, fmt.Errorf("no associared pool")
}
- return nil
+ return pool.Exec(rqs)
}
-// Stop the server and close underlying factory.
-func (r *Server) Stop() {
- r.mu.Lock()
- defer r.mu.Unlock()
+// Workers returns worker list associated with the server pool.
+func (srv *Server) Workers() (workers []*Worker) {
+ p := srv.Pool()
+ if p == nil {
+ return nil
+ }
- r.factory.Close()
- r.factory = nil
+ return p.Workers()
}
-// throw invokes event handler if any.
-func (r *Server) throw(event int, ctx interface{}) {
- if r.observer != nil {
- r.observer(event, ctx)
- }
+// Pool returns active pool or error.
+func (srv *Server) Pool() Pool {
+ srv.mu.Lock()
+ defer srv.mu.Unlock()
+
+ return srv.pool
}
// Observe pool events.
-func (r *Server) poolObserver(event int, ctx interface{}) {
+func (srv *Server) poolObserver(event int, ctx interface{}) {
// bypassing to user specified observer
- r.throw(event, ctx)
+ srv.throw(event, ctx)
if event == EventPoolError {
// pool failure, rebuilding
- r.Reset()
+ srv.Reset()
+ }
+}
+
+// throw invokes event handler if any.
+func (srv *Server) throw(event int, ctx interface{}) {
+ if srv.observer != nil {
+ srv.observer(event, ctx)
}
}