summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/cmd/stop.go4
-rw-r--r--server.go49
-rw-r--r--service/container.go4
-rw-r--r--service/http/service.go2
-rw-r--r--service/rpc/service.go2
-rw-r--r--service/rpc/system.go2
-rw-r--r--service/watcher/watcher.go49
-rw-r--r--watcher.go8
-rw-r--r--worker.go2
9 files changed, 58 insertions, 64 deletions
diff --git a/cmd/rr/cmd/stop.go b/cmd/rr/cmd/stop.go
index 8beee73e..2f48615b 100644
--- a/cmd/rr/cmd/stop.go
+++ b/cmd/rr/cmd/stop.go
@@ -28,7 +28,7 @@ import (
func init() {
CLI.AddCommand(&cobra.Command{
Use: "stop",
- Short: "Stop RoadRunner server",
+ Short: "Detach RoadRunner server",
RunE: stopHandler,
})
}
@@ -43,7 +43,7 @@ func stopHandler(cmd *cobra.Command, args []string) error {
util.Printf("<green>Stopping RoadRunner</reset>: ")
var r string
- if err := client.Call("system.Stop", true, &r); err != nil {
+ if err := client.Call("system.Detach", true, &r); err != nil {
return err
}
diff --git a/server.go b/server.go
index 2672a792..397898f2 100644
--- a/server.go
+++ b/server.go
@@ -37,9 +37,13 @@ type Server struct {
// creates and connects to workers
factory Factory
+ // associated pool watcher
+ watcher Watcher
+
// currently active pool instance
- mup sync.Mutex
- pool Pool
+ mup sync.Mutex
+ pool Pool
+ pWatcher Watcher
// observes pool events (can be attached to multiple pools at the same time)
mul sync.Mutex
@@ -59,6 +63,21 @@ func (s *Server) Listen(l func(event int, ctx interface{})) {
s.lsn = l
}
+// Listen attaches server event watcher.
+func (s *Server) Watch(w Watcher) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.watcher = w
+
+ s.mul.Lock()
+ if s.pWatcher != nil && s.pool != nil {
+ s.pWatcher.Detach()
+ s.pWatcher = s.watcher.Attach(s.pool)
+ }
+ s.mul.Unlock()
+}
+
// Start underlying worker pool, configure factory and command provider.
func (s *Server) Start() (err error) {
s.mu.Lock()
@@ -72,6 +91,10 @@ func (s *Server) Start() (err error) {
return err
}
+ if s.watcher != nil {
+ s.pWatcher = s.watcher.Attach(s.pool)
+ }
+
s.pool.Listen(s.poolListener)
s.started = true
s.throw(EventServerStart, s)
@@ -79,7 +102,7 @@ func (s *Server) Start() (err error) {
return nil
}
-// Stop underlying worker pool and close the factory.
+// Detach underlying worker pool and close the factory.
func (s *Server) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
@@ -89,6 +112,12 @@ func (s *Server) Stop() {
}
s.throw(EventPoolDestruct, s.pool)
+
+ if s.pWatcher != nil {
+ s.pWatcher.Detach()
+ s.pWatcher = nil
+ }
+
s.pool.Destroy()
s.factory.Close()
@@ -128,6 +157,7 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.mu.Lock()
previous := s.pool
+ pWatcher := s.pWatcher
s.mu.Unlock()
pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
@@ -139,15 +169,24 @@ func (s *Server) Reconfigure(cfg *ServerConfig) error {
s.mu.Lock()
s.cfg.Pool, s.pool = cfg.Pool, pool
+
+ if s.watcher != nil {
+ s.pWatcher = s.watcher.Attach(pool)
+ }
+
s.mu.Unlock()
s.throw(EventPoolConstruct, pool)
if previous != nil {
- go func(previous Pool) {
+ go func(previous Pool, pWatcher Watcher) {
s.throw(EventPoolDestruct, previous)
+ if pWatcher != nil {
+ pWatcher.Detach()
+ }
+
previous.Destroy()
- }(previous)
+ }(previous, pWatcher)
}
return nil
diff --git a/service/container.go b/service/container.go
index 275cfffd..6aa99614 100644
--- a/service/container.go
+++ b/service/container.go
@@ -22,7 +22,7 @@ type Service interface {
// Serve serves.
Serve() error
- // Stop stops the service.
+ // Detach stops the service.
Stop()
}
@@ -198,7 +198,7 @@ func (c *container) Serve() error {
return nil
}
-// Stop sends stop command to all running services.
+// Detach sends stop command to all running services.
func (c *container) Stop() {
for _, e := range c.services {
if e.hasStatus(StatusServing) {
diff --git a/service/http/service.go b/service/http/service.go
index 6e4542bf..1a06a76a 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -103,7 +103,7 @@ func (s *Service) Serve() error {
return <-err
}
-// Stop stops the svc.
+// Detach stops the svc.
func (s *Service) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/service/rpc/service.go b/service/rpc/service.go
index eba74c2d..066124a2 100644
--- a/service/rpc/service.go
+++ b/service/rpc/service.go
@@ -83,7 +83,7 @@ func (s *Service) Serve() error {
return nil
}
-// Stop stops the service.
+// Detach stops the service.
func (s *Service) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/service/rpc/system.go b/service/rpc/system.go
index 778250ad..d1368a05 100644
--- a/service/rpc/system.go
+++ b/service/rpc/system.go
@@ -7,7 +7,7 @@ type systemService struct {
c service.Container
}
-// Stop the underlying c.
+// Detach the underlying c.
func (s *systemService) Stop(stop bool, r *string) error {
if stop {
s.c.Stop()
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go
index 9cf129df..8278790e 100644
--- a/service/watcher/watcher.go
+++ b/service/watcher/watcher.go
@@ -1,50 +1 @@
package watcher
-
-import (
- "log"
- "time"
-)
-
-// disconnect??
-type Service struct {
- // defines how often
- interval time.Duration
- pool Pool
-
- stop chan interface{}
-}
-
-// NewWatcher creates new pool watcher.
-func NewWatcher(p Pool, i time.Duration) *Service {
- w := &Service{
- interval: i,
- pool: p,
- stop: make(chan interface{}),
- }
-
- go func() {
- ticker := time.NewTicker(w.interval)
-
- for {
- select {
- case <-ticker.C:
- w.update()
- case <-w.stop:
- return
- }
- }
- }()
-
- return w
-}
-
-func (w *Service) Stop() {
- close(w.stop)
-}
-
-func (w *Service) update() {
- for _, w := range w.pool.Workers() {
- log.Println(w)
-
- }
-}
diff --git a/watcher.go b/watcher.go
index 7d9c6060..4527fe68 100644
--- a/watcher.go
+++ b/watcher.go
@@ -1,6 +1,10 @@
package roadrunner
-// disconnect??
+// Watcher observes pool state and decides if any worker must be destroyed.
type Watcher interface {
- // ?? lock on pool
+ // Lock watcher on given pool instance.
+ Attach(p Pool) Watcher
+
+ // Detach pool watching.
+ Detach()
}
diff --git a/worker.go b/worker.go
index b01b362a..d476b918 100644
--- a/worker.go
+++ b/worker.go
@@ -127,7 +127,7 @@ func (w *Worker) Wait() error {
return &exec.ExitError{ProcessState: w.endState}
}
-// Stop sends soft termination command to the worker and waits for process completion.
+// Detach sends soft termination command to the worker and waits for process completion.
func (w *Worker) Stop() error {
select {
case <-w.waitDone: