diff options
-rw-r--r-- | cmd/rr/cmd/stop.go | 4 | ||||
-rw-r--r-- | server.go | 49 | ||||
-rw-r--r-- | service/container.go | 4 | ||||
-rw-r--r-- | service/http/service.go | 2 | ||||
-rw-r--r-- | service/rpc/service.go | 2 | ||||
-rw-r--r-- | service/rpc/system.go | 2 | ||||
-rw-r--r-- | service/watcher/watcher.go | 49 | ||||
-rw-r--r-- | watcher.go | 8 | ||||
-rw-r--r-- | worker.go | 2 |
9 files changed, 58 insertions, 64 deletions
diff --git a/cmd/rr/cmd/stop.go b/cmd/rr/cmd/stop.go index 8beee73e..2f48615b 100644 --- a/cmd/rr/cmd/stop.go +++ b/cmd/rr/cmd/stop.go @@ -28,7 +28,7 @@ import ( func init() { CLI.AddCommand(&cobra.Command{ Use: "stop", - Short: "Stop RoadRunner server", + Short: "Detach RoadRunner server", RunE: stopHandler, }) } @@ -43,7 +43,7 @@ func stopHandler(cmd *cobra.Command, args []string) error { util.Printf("<green>Stopping RoadRunner</reset>: ") var r string - if err := client.Call("system.Stop", true, &r); err != nil { + if err := client.Call("system.Detach", true, &r); err != nil { return err } @@ -37,9 +37,13 @@ type Server struct { // creates and connects to workers factory Factory + // associated pool watcher + watcher Watcher + // currently active pool instance - mup sync.Mutex - pool Pool + mup sync.Mutex + pool Pool + pWatcher Watcher // observes pool events (can be attached to multiple pools at the same time) mul sync.Mutex @@ -59,6 +63,21 @@ func (s *Server) Listen(l func(event int, ctx interface{})) { s.lsn = l } +// Listen attaches server event watcher. +func (s *Server) Watch(w Watcher) { + s.mu.Lock() + defer s.mu.Unlock() + + s.watcher = w + + s.mul.Lock() + if s.pWatcher != nil && s.pool != nil { + s.pWatcher.Detach() + s.pWatcher = s.watcher.Attach(s.pool) + } + s.mul.Unlock() +} + // Start underlying worker pool, configure factory and command provider. func (s *Server) Start() (err error) { s.mu.Lock() @@ -72,6 +91,10 @@ func (s *Server) Start() (err error) { return err } + if s.watcher != nil { + s.pWatcher = s.watcher.Attach(s.pool) + } + s.pool.Listen(s.poolListener) s.started = true s.throw(EventServerStart, s) @@ -79,7 +102,7 @@ func (s *Server) Start() (err error) { return nil } -// Stop underlying worker pool and close the factory. +// Detach underlying worker pool and close the factory. func (s *Server) Stop() { s.mu.Lock() defer s.mu.Unlock() @@ -89,6 +112,12 @@ func (s *Server) Stop() { } s.throw(EventPoolDestruct, s.pool) + + if s.pWatcher != nil { + s.pWatcher.Detach() + s.pWatcher = nil + } + s.pool.Destroy() s.factory.Close() @@ -128,6 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() previous := s.pool + pWatcher := s.pWatcher s.mu.Unlock() pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) @@ -139,15 +169,24 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error { s.mu.Lock() s.cfg.Pool, s.pool = cfg.Pool, pool + + if s.watcher != nil { + s.pWatcher = s.watcher.Attach(pool) + } + s.mu.Unlock() s.throw(EventPoolConstruct, pool) if previous != nil { - go func(previous Pool) { + go func(previous Pool, pWatcher Watcher) { s.throw(EventPoolDestruct, previous) + if pWatcher != nil { + pWatcher.Detach() + } + previous.Destroy() - }(previous) + }(previous, pWatcher) } return nil diff --git a/service/container.go b/service/container.go index 275cfffd..6aa99614 100644 --- a/service/container.go +++ b/service/container.go @@ -22,7 +22,7 @@ type Service interface { // Serve serves. Serve() error - // Stop stops the service. + // Detach stops the service. Stop() } @@ -198,7 +198,7 @@ func (c *container) Serve() error { return nil } -// Stop sends stop command to all running services. +// Detach sends stop command to all running services. func (c *container) Stop() { for _, e := range c.services { if e.hasStatus(StatusServing) { diff --git a/service/http/service.go b/service/http/service.go index 6e4542bf..1a06a76a 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -103,7 +103,7 @@ func (s *Service) Serve() error { return <-err } -// Stop stops the svc. +// Detach stops the svc. func (s *Service) Stop() { s.mu.Lock() defer s.mu.Unlock() diff --git a/service/rpc/service.go b/service/rpc/service.go index eba74c2d..066124a2 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -83,7 +83,7 @@ func (s *Service) Serve() error { return nil } -// Stop stops the service. +// Detach stops the service. func (s *Service) Stop() { s.mu.Lock() defer s.mu.Unlock() diff --git a/service/rpc/system.go b/service/rpc/system.go index 778250ad..d1368a05 100644 --- a/service/rpc/system.go +++ b/service/rpc/system.go @@ -7,7 +7,7 @@ type systemService struct { c service.Container } -// Stop the underlying c. +// Detach the underlying c. func (s *systemService) Stop(stop bool, r *string) error { if stop { s.c.Stop() diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go index 9cf129df..8278790e 100644 --- a/service/watcher/watcher.go +++ b/service/watcher/watcher.go @@ -1,50 +1 @@ package watcher - -import ( - "log" - "time" -) - -// disconnect?? -type Service struct { - // defines how often - interval time.Duration - pool Pool - - stop chan interface{} -} - -// NewWatcher creates new pool watcher. -func NewWatcher(p Pool, i time.Duration) *Service { - w := &Service{ - interval: i, - pool: p, - stop: make(chan interface{}), - } - - go func() { - ticker := time.NewTicker(w.interval) - - for { - select { - case <-ticker.C: - w.update() - case <-w.stop: - return - } - } - }() - - return w -} - -func (w *Service) Stop() { - close(w.stop) -} - -func (w *Service) update() { - for _, w := range w.pool.Workers() { - log.Println(w) - - } -} @@ -1,6 +1,10 @@ package roadrunner -// disconnect?? +// Watcher observes pool state and decides if any worker must be destroyed. type Watcher interface { - // ?? lock on pool + // Lock watcher on given pool instance. + Attach(p Pool) Watcher + + // Detach pool watching. + Detach() } @@ -127,7 +127,7 @@ func (w *Worker) Wait() error { return &exec.ExitError{ProcessState: w.endState} } -// Stop sends soft termination command to the worker and waits for process completion. +// Detach sends soft termination command to the worker and waits for process completion. func (w *Worker) Stop() error { select { case <-w.waitDone: |