diff options
-rw-r--r-- | balancer.go | 71 | ||||
-rw-r--r-- | balancer_test.go | 1 | ||||
-rw-r--r-- | payload_test.go | 2 | ||||
-rw-r--r-- | pipe_factory.go | 1 | ||||
-rw-r--r-- | pipe_factory_test.go | 1 | ||||
-rw-r--r-- | pool.go | 3 | ||||
-rw-r--r-- | pool_test.go | 2 | ||||
-rw-r--r-- | worker.go | 14 |
8 files changed, 15 insertions, 80 deletions
diff --git a/balancer.go b/balancer.go deleted file mode 100644 index 16418d55..00000000 --- a/balancer.go +++ /dev/null @@ -1,71 +0,0 @@ -package roadrunner - -// -//import ( -// "os/exec" -// "sync" -//) -// -//// Swapper provides ability to perform hot-swap between 2 worker pools. -//type Swapper struct { -// mu sync.Mutex // protects pool hot swapping -// pool *Pool // pool to work for user commands -//} -// -//// Swap initiates underlying pool of workers and replaces old one. -//func (b *Swapper) Swap(cmd func() *exec.Cmd, factory Factory, cfg Config) error { -// var ( -// err error -// prev *Pool -// pool *Pool -// ) -// -// prev = b.pool -// if pool, err = NewPool(cmd, factory, cfg); err != nil { -// return err -// } -// -// if prev != nil { -// go func() { -// prev.Close() -// }() -// } -// -// b.mu.Lock() -// b.pool = pool -// b.mu.Unlock() -// -// return nil -//} -// -//// Exec one task with given payload and context, returns result and context -//// or error. Must not be used once pool is being destroyed. -//func (b *Swapper) Exec(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { -// b.mu.Lock() -// pool := b.pool -// b.mu.Unlock() -// -// if pool == nil { -// panic("what") -// } -// -// return pool.Exec(payload, ctx) -//} -// -//// Workers return list of active workers. -//func (b *Swapper) Workers() []*Worker { -// b.mu.Lock() -// pool := b.pool -// b.mu.Unlock() -// -// return pool.Workers() -//} -// -//// Close closes underlying pool. -//func (b *Swapper) Close() { -// b.mu.Lock() -// defer b.mu.Unlock() -// -// b.pool.Close() -// b.pool = nil -//} diff --git a/balancer_test.go b/balancer_test.go deleted file mode 100644 index 3f283dce..00000000 --- a/balancer_test.go +++ /dev/null @@ -1 +0,0 @@ -package roadrunner diff --git a/payload_test.go b/payload_test.go index 3f283dce..1721392a 100644 --- a/payload_test.go +++ b/payload_test.go @@ -1 +1,3 @@ package roadrunner + +//todo: need payload test
\ No newline at end of file diff --git a/pipe_factory.go b/pipe_factory.go index 3caa58ee..36f7a7e3 100644 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -47,7 +47,6 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { // todo: timeout ? if pid, err := fetchPID(w.rl); pid != *w.Pid { go func(w *Worker) { w.Kill() }(w) - if wErr := w.Wait(); wErr != nil { err = errors.Wrap(wErr, err.Error()) } diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 97e94487..f09bed31 100644 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -22,7 +22,6 @@ func Test_Pipe_Start(t *testing.T) { func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "tests/failboot.php") - w, err := NewPipeFactory().SpawnWorker(cmd) assert.Nil(t, w) @@ -68,8 +68,11 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) { // worker watcher go func(w *Worker) { if err := w.Wait(); err != nil { + // todo: register error log.Println(err) + + //todo: automatic replace } }(w) diff --git a/pool_test.go b/pool_test.go index 2706d0c3..e02993b0 100644 --- a/pool_test.go +++ b/pool_test.go @@ -2,13 +2,13 @@ package roadrunner import ( "github.com/stretchr/testify/assert" + "log" "os/exec" "runtime" "strconv" "sync" "testing" "time" - "log" ) var cfg = Config{ @@ -7,6 +7,7 @@ import ( "github.com/spiral/goridge" "os" "os/exec" + "runtime" "strconv" "strings" "sync" @@ -100,17 +101,16 @@ func (w *Worker) Start() error { w.Pid = &w.cmd.Process.Pid - // relays for process to complete + // wait for process to complete go func() { w.endState, _ = w.cmd.Process.Wait() if w.waitDone != nil { w.state.set(StateStopped) - close(w.waitDone) + if w.rl != nil { w.mu.Lock() defer w.mu.Unlock() - w.rl.Close() } } @@ -130,7 +130,11 @@ func (w *Worker) Wait() error { w.mu.Lock() defer w.mu.Unlock() - w.cmd.Wait() + if runtime.GOOS != "windows" { + // windows handles processes and close pipes differently, + // we can ignore wait here as process.Wait() already being handled above + w.cmd.Wait() + } if w.endState.Success() { return nil @@ -172,7 +176,7 @@ func (w *Worker) Kill() error { defer w.mu.Unlock() w.state.set(StateInactive) - err := w.cmd.Process.Kill() + err := w.cmd.Process.Signal(os.Kill) <-w.waitDone return err |