summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--balancer.go71
-rw-r--r--balancer_test.go1
-rw-r--r--payload_test.go2
-rw-r--r--pipe_factory.go1
-rw-r--r--pipe_factory_test.go1
-rw-r--r--pool.go3
-rw-r--r--pool_test.go2
-rw-r--r--worker.go14
8 files changed, 15 insertions, 80 deletions
diff --git a/balancer.go b/balancer.go
deleted file mode 100644
index 16418d55..00000000
--- a/balancer.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package roadrunner
-
-//
-//import (
-// "os/exec"
-// "sync"
-//)
-//
-//// Swapper provides ability to perform hot-swap between 2 worker pools.
-//type Swapper struct {
-// mu sync.Mutex // protects pool hot swapping
-// pool *Pool // pool to work for user commands
-//}
-//
-//// Swap initiates underlying pool of workers and replaces old one.
-//func (b *Swapper) Swap(cmd func() *exec.Cmd, factory Factory, cfg Config) error {
-// var (
-// err error
-// prev *Pool
-// pool *Pool
-// )
-//
-// prev = b.pool
-// if pool, err = NewPool(cmd, factory, cfg); err != nil {
-// return err
-// }
-//
-// if prev != nil {
-// go func() {
-// prev.Close()
-// }()
-// }
-//
-// b.mu.Lock()
-// b.pool = pool
-// b.mu.Unlock()
-//
-// return nil
-//}
-//
-//// Exec one task with given payload and context, returns result and context
-//// or error. Must not be used once pool is being destroyed.
-//func (b *Swapper) Exec(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
-// b.mu.Lock()
-// pool := b.pool
-// b.mu.Unlock()
-//
-// if pool == nil {
-// panic("what")
-// }
-//
-// return pool.Exec(payload, ctx)
-//}
-//
-//// Workers return list of active workers.
-//func (b *Swapper) Workers() []*Worker {
-// b.mu.Lock()
-// pool := b.pool
-// b.mu.Unlock()
-//
-// return pool.Workers()
-//}
-//
-//// Close closes underlying pool.
-//func (b *Swapper) Close() {
-// b.mu.Lock()
-// defer b.mu.Unlock()
-//
-// b.pool.Close()
-// b.pool = nil
-//}
diff --git a/balancer_test.go b/balancer_test.go
deleted file mode 100644
index 3f283dce..00000000
--- a/balancer_test.go
+++ /dev/null
@@ -1 +0,0 @@
-package roadrunner
diff --git a/payload_test.go b/payload_test.go
index 3f283dce..1721392a 100644
--- a/payload_test.go
+++ b/payload_test.go
@@ -1 +1,3 @@
package roadrunner
+
+//todo: need payload test \ No newline at end of file
diff --git a/pipe_factory.go b/pipe_factory.go
index 3caa58ee..36f7a7e3 100644
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -47,7 +47,6 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
// todo: timeout ?
if pid, err := fetchPID(w.rl); pid != *w.Pid {
go func(w *Worker) { w.Kill() }(w)
-
if wErr := w.Wait(); wErr != nil {
err = errors.Wrap(wErr, err.Error())
}
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 97e94487..f09bed31 100644
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -22,7 +22,6 @@ func Test_Pipe_Start(t *testing.T) {
func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "tests/failboot.php")
-
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Nil(t, w)
diff --git a/pool.go b/pool.go
index a14dde26..21637bef 100644
--- a/pool.go
+++ b/pool.go
@@ -68,8 +68,11 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) {
// worker watcher
go func(w *Worker) {
if err := w.Wait(); err != nil {
+
// todo: register error
log.Println(err)
+
+ //todo: automatic replace
}
}(w)
diff --git a/pool_test.go b/pool_test.go
index 2706d0c3..e02993b0 100644
--- a/pool_test.go
+++ b/pool_test.go
@@ -2,13 +2,13 @@ package roadrunner
import (
"github.com/stretchr/testify/assert"
+ "log"
"os/exec"
"runtime"
"strconv"
"sync"
"testing"
"time"
- "log"
)
var cfg = Config{
diff --git a/worker.go b/worker.go
index d581481f..851239f9 100644
--- a/worker.go
+++ b/worker.go
@@ -7,6 +7,7 @@ import (
"github.com/spiral/goridge"
"os"
"os/exec"
+ "runtime"
"strconv"
"strings"
"sync"
@@ -100,17 +101,16 @@ func (w *Worker) Start() error {
w.Pid = &w.cmd.Process.Pid
- // relays for process to complete
+ // wait for process to complete
go func() {
w.endState, _ = w.cmd.Process.Wait()
if w.waitDone != nil {
w.state.set(StateStopped)
-
close(w.waitDone)
+
if w.rl != nil {
w.mu.Lock()
defer w.mu.Unlock()
-
w.rl.Close()
}
}
@@ -130,7 +130,11 @@ func (w *Worker) Wait() error {
w.mu.Lock()
defer w.mu.Unlock()
- w.cmd.Wait()
+ if runtime.GOOS != "windows" {
+ // windows handles processes and close pipes differently,
+ // we can ignore wait here as process.Wait() already being handled above
+ w.cmd.Wait()
+ }
if w.endState.Success() {
return nil
@@ -172,7 +176,7 @@ func (w *Worker) Kill() error {
defer w.mu.Unlock()
w.state.set(StateInactive)
- err := w.cmd.Process.Kill()
+ err := w.cmd.Process.Signal(os.Kill)
<-w.waitDone
return err