summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pool.go2
-rw-r--r--pool_supervisor.go2
-rw-r--r--process_state.go4
-rw-r--r--static_pool.go12
-rw-r--r--static_pool_test.go76
-rw-r--r--worker.go77
-rw-r--r--workers_watcher.go76
7 files changed, 123 insertions, 126 deletions
diff --git a/pool.go b/pool.go
index 540abb4c..67d092c0 100644
--- a/pool.go
+++ b/pool.go
@@ -50,7 +50,7 @@ type Pool interface {
Exec(ctx context.Context, rqs Payload) (Payload, error)
// Workers returns worker list associated with the pool.
- Workers(ctx context.Context) (workers []WorkerBase)
+ Workers() (workers []WorkerBase)
RemoveWorker(ctx context.Context, worker WorkerBase) error
diff --git a/pool_supervisor.go b/pool_supervisor.go
index cadf5f9c..93afb8c6 100644
--- a/pool_supervisor.go
+++ b/pool_supervisor.go
@@ -99,7 +99,7 @@ func (sps *staticPoolSupervisor) control() error {
ctx := context.TODO()
// THIS IS A COPY OF WORKERS
- workers := sps.pool.Workers(ctx)
+ workers := sps.pool.Workers()
var totalUsedMemory uint64
for i := 0; i < len(workers); i++ {
diff --git a/process_state.go b/process_state.go
index 747fa8a8..1a4c4d65 100644
--- a/process_state.go
+++ b/process_state.go
@@ -1,8 +1,6 @@
package roadrunner
import (
- "context"
-
"github.com/shirou/gopsutil/process"
)
@@ -45,7 +43,7 @@ func WorkerProcessState(w WorkerBase) (ProcessState, error) {
// ServerState returns list of all worker states of a given rr server.
func PoolState(pool Pool) ([]ProcessState, error) {
result := make([]ProcessState, 0)
- for _, w := range pool.Workers(context.TODO()) {
+ for _, w := range pool.Workers() {
state, err := WorkerProcessState(w)
if err != nil {
return nil, err
diff --git a/static_pool.go b/static_pool.go
index 1444e95a..2e72864d 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -51,7 +51,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co
events: make(chan PoolEvent),
}
- p.ww = NewWorkerWatcher(func(args ...interface{}) (*SyncWorker, error) {
+ p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) {
w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
if err != nil {
return nil, err
@@ -61,7 +61,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co
if err != nil {
return nil, err
}
- return &sw, nil
+ return sw, nil
}, p.cfg.NumWorkers, p.events)
workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
@@ -84,10 +84,8 @@ func (p *StaticPool) Config() Config {
}
// Workers returns worker list associated with the pool.
-func (p *StaticPool) Workers(ctx context.Context) (workers []WorkerBase) {
- p.muw.RLock()
- defer p.muw.RUnlock()
- return p.ww.WorkersList(ctx)
+func (p *StaticPool) Workers() (workers []WorkerBase) {
+ return p.ww.WorkersList()
}
func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
@@ -136,7 +134,7 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) {
w.State().Set(StateInvalid)
err = w.Stop(ctx)
if err != nil {
- panic(err)
+ return EmptyPayload, err
}
return p.Exec(ctx, rqs)
}
diff --git a/static_pool_test.go b/static_pool_test.go
index a2daedd6..b2ab4713 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -220,29 +220,36 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Nil(t, res.Context)
assert.Equal(t, "hello", res.String())
- assert.Equal(t, runtime.NumCPU(), len(p.Workers(ctx)))
+ assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
// Consume pool events
+ wg := sync.WaitGroup{}
+ wg.Add(1)
go func() {
for true {
select {
case ev := <-p.Events():
fmt.Println(ev)
+ if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct {
+ wg.Done()
+ }
}
}
}()
// killing random worker and expecting pool to replace it
- err = p.Workers(ctx)[0].Kill(ctx)
+ err = p.Workers()[0].Kill(ctx)
if err != nil {
t.Errorf("error killing the process: error %v", err)
}
- time.Sleep(time.Second * 2)
+ wg.Wait()
- for _, w := range p.Workers(ctx) {
+ list := p.Workers()
+ for _, w := range list {
assert.Equal(t, StateReady, w.State().Value())
}
+ wg.Wait()
}
func Test_StaticPool_AllocateTimeout(t *testing.T) {
@@ -281,7 +288,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
assert.NotNil(t, p)
var lastPID string
- lastPID = strconv.Itoa(int(p.Workers(ctx)[0].Pid()))
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
res, _ := p.Exec(ctx, Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
@@ -318,8 +325,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
assert.NotNil(t, p)
+ go func() {
+ for {
+ select {
+ case ev := <-p.Events():
+ fmt.Println(ev)
+ }
+ }
+ }()
+
var lastPID string
- lastPID = strconv.Itoa(int(p.Workers(ctx)[0].Pid()))
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
if err != nil {
@@ -395,29 +411,31 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
}
// identical to replace but controlled on worker side
-// TODO inconsistent state
-//func Test_Static_Pool_Handle_Dead(t *testing.T) {
-// p, err := NewPool(
-// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
-// NewPipeFactory(),
-// Config{
-// NumWorkers: 5,
-// AllocateTimeout: time.Second,
-// DestroyTimeout: time.Second,
-// },
-// )
-// assert.NoError(t, err)
-// defer p.Destroy()
-//
-// assert.NotNil(t, p)
-//
-// for _, w := range p.stack {
-// w.state.value = StateErrored
-// }
-//
-// _, err = p.Exec(&Payload{Body: []byte("hello")})
-// assert.Error(t, err)
-//}
+func Test_Static_Pool_Handle_Dead(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
+ NewPipeFactory(),
+ &Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ ExecTTL: time.Second * 5,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ for _, w := range p.Workers() {
+ w.State().Set(StateErrored)
+ }
+
+ _, err = p.Exec(ctx, Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+}
// identical to replace but controlled on worker side
func Test_Static_Pool_Slow_Destroy(t *testing.T) {
diff --git a/worker.go b/worker.go
index 855a9958..82bd99df 100644
--- a/worker.go
+++ b/worker.go
@@ -18,7 +18,7 @@ import (
// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError int64 = iota + 100
+ EventWorkerError int64 = iota + 200
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
@@ -200,49 +200,37 @@ func (w *WorkerProcess) Events() <-chan WorkerEvent {
// will be wrapped as WorkerError. Method will return error code if php process fails
// to find or Start the script.
func (w *WorkerProcess) Wait(ctx context.Context) error {
- c := make(chan error)
- go func() {
- err := multierr.Combine(w.cmd.Wait())
- w.endState = w.cmd.ProcessState
- if err != nil {
- w.state.Set(StateErrored)
- // if there are messages in the events channel, read it
- // TODO potentially danger place
- if len(w.events) > 0 {
- select {
- case ev := <-w.events:
- err = multierr.Append(err, errors.New(string(ev.Payload.([]byte))))
- }
- }
- // if no errors in the events, error might be in the errbuffer
- if w.errBuffer.Len() > 0 {
- err = multierr.Append(err, errors.New(w.errBuffer.String()))
+ err := multierr.Combine(w.cmd.Wait())
+ w.endState = w.cmd.ProcessState
+ if err != nil {
+ w.state.Set(StateErrored)
+ // if there are messages in the events channel, read it
+ // TODO potentially danger place
+ if len(w.events) > 0 {
+ select {
+ case ev := <-w.events:
+ err = multierr.Append(err, errors.New(string(ev.Payload.([]byte))))
}
-
- c <- multierr.Append(err, w.closeRelay())
- return
}
-
- err = multierr.Append(err, w.closeRelay())
- if err != nil {
- w.state.Set(StateErrored)
- c <- err
- return
+ // if no errors in the events, error might be in the errbuffer
+ if w.errBuffer.Len() > 0 {
+ err = multierr.Append(err, errors.New(w.errBuffer.String()))
}
- if w.endState.Success() {
- w.state.Set(StateStopped)
- }
- c <- nil
- }()
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-c:
- return err
- }
+ return multierr.Append(err, w.closeRelay())
+ }
+
+ err = multierr.Append(err, w.closeRelay())
+ if err != nil {
+ w.state.Set(StateErrored)
+ return err
}
+
+ if w.endState.Success() {
+ w.state.Set(StateStopped)
+ }
+
+ return nil
}
func (w *WorkerProcess) closeRelay() error {
@@ -259,20 +247,15 @@ func (w *WorkerProcess) closeRelay() error {
func (w *WorkerProcess) Stop(ctx context.Context) error {
c := make(chan error)
go func() {
- var errs []string
+ var err error
w.errBuffer.Close()
w.state.Set(StateStopping)
w.mu.Lock()
defer w.mu.Unlock()
- err := sendControl(w.relay, &stopCommand{Stop: true})
+ err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true}))
if err != nil {
- errs = append(errs, err.Error())
w.state.Set(StateKilling)
- err = w.cmd.Process.Kill()
- if err != nil {
- errs = append(errs, err.Error())
- }
- c <- errors.New(strings.Join(errs, "|"))
+ c <- multierr.Append(err, w.cmd.Process.Kill())
}
w.state.Set(StateStopped)
c <- nil
diff --git a/workers_watcher.go b/workers_watcher.go
index f8522c46..d9d27196 100644
--- a/workers_watcher.go
+++ b/workers_watcher.go
@@ -60,9 +60,9 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
}
type WorkersWatcher struct {
- mutex sync.Mutex
+ mutex sync.RWMutex
stack *Stack
- allocator func(args ...interface{}) (*SyncWorker, error)
+ allocator func(args ...interface{}) (WorkerBase, error)
initialNumWorkers int64
actualNumWorkers int64
events chan PoolEvent
@@ -80,13 +80,13 @@ type WorkerWatcher interface {
// Destroy destroys the underlying stack
Destroy(ctx context.Context)
// WorkersList return all stack w/o removing it from internal storage
- WorkersList(ctx context.Context) []WorkerBase
+ WorkersList() []WorkerBase
// RemoveWorker remove worker from the stack
RemoveWorker(ctx context.Context, wb WorkerBase) error
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator func(args ...interface{}) (*SyncWorker, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher {
+func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher {
// todo check if events not nil
ww := &WorkersWatcher{
stack: NewWorkersStack(),
@@ -106,11 +106,10 @@ func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase)
return err
}
ww.stack.Push(sw)
+ ww.watch(sw)
+
go func(swc WorkerBase) {
- //ww.mutex.Lock()
- ww.watch(&swc)
- ww.wait(ctx, &swc)
- //ww.mutex.Unlock()
+ ww.wait(ctx, swc)
}(sw)
}
return nil
@@ -146,14 +145,14 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
if w == nil {
continue
}
- ww.actualNumWorkers--
+ ww.decreaseNumOfActualWorkers()
return w, nil
case <-tout.C:
return nil, errors.New("no free stack")
}
}
}
- ww.actualNumWorkers--
+ ww.decreaseNumOfActualWorkers()
return w, nil
}
@@ -163,9 +162,9 @@ func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error {
if err != nil {
return err
}
- ww.addToWatch(*sw)
+ ww.addToWatch(sw)
ww.stack.mutex.Unlock()
- ww.PushWorker(*sw)
+ ww.PushWorker(sw)
return nil
}
@@ -202,9 +201,7 @@ func (ww *WorkersWatcher) PushWorker(w WorkerBase) {
}
func (ww *WorkersWatcher) ReduceWorkersCount() {
- ww.mutex.Unlock()
- ww.actualNumWorkers--
- ww.mutex.Lock()
+ ww.decreaseNumOfActualWorkers()
}
// Destroy all underlying stack (but let them to complete the task)
@@ -217,9 +214,12 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) {
for {
select {
case <-tt.C:
+ ww.stack.mutex.Lock()
if len(ww.stack.workers) != int(ww.actualNumWorkers) {
+ ww.stack.mutex.Unlock()
continue
}
+ ww.stack.mutex.Unlock()
// unnecessary mutex, but
// just to make sure. All stack at this moment are in the stack
// Pop operation is blocked, push can't be done, since it's not possible to pop
@@ -238,53 +238,62 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation
-func (ww *WorkersWatcher) WorkersList(ctx context.Context) []WorkerBase {
+func (ww *WorkersWatcher) WorkersList() []WorkerBase {
return ww.stack.workers
}
-func (ww *WorkersWatcher) wait(ctx context.Context, w *WorkerBase) {
- err := (*w).Wait(ctx)
+func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) {
+ err := w.Wait(ctx)
if err != nil {
ww.events <- PoolEvent{Payload: WorkerEvent{
Event: EventWorkerError,
- Worker: *w,
+ Worker: w,
Payload: err,
}}
}
// If not destroyed, reallocate
- if (*w).State().Value() != StateDestroyed {
- pid := (*w).Pid()
+ if w.State().Value() != StateDestroyed {
+ pid := w.Pid()
+ ww.stack.mutex.Lock()
for i := 0; i < len(ww.stack.workers); i++ {
// worker in the stack, reallocating
if ww.stack.workers[i].Pid() == pid {
- ww.stack.mutex.Lock()
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
-
ww.decreaseNumOfActualWorkers()
-
ww.stack.mutex.Unlock()
err = ww.AllocateNew(ctx)
if err != nil {
ww.events <- PoolEvent{Payload: WorkerEvent{
Event: EventWorkerError,
- Worker: *w,
+ Worker: w,
Payload: err,
}}
return
}
+ ww.events <- PoolEvent{Payload: WorkerEvent{
+ Event: EventWorkerConstruct,
+ Worker: nil,
+ Payload: nil,
+ }}
return
}
}
+ ww.stack.mutex.Unlock()
// worker not in the stack (not returned), forget and allocate new
err = ww.AllocateNew(ctx)
if err != nil {
ww.events <- PoolEvent{Payload: WorkerEvent{
Event: EventWorkerError,
- Worker: *w,
+ Worker: w,
Payload: err,
}}
return
}
+ ww.events <- PoolEvent{Payload: WorkerEvent{
+ Event: EventWorkerConstruct,
+ Worker: nil,
+ Payload: nil,
+ }}
}
return
}
@@ -293,30 +302,21 @@ func (ww *WorkersWatcher) addToWatch(wb WorkerBase) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
go func() {
- ww.wait(context.Background(), &wb)
+ ww.wait(context.Background(), wb)
}()
}
-func (ww *WorkersWatcher) reallocate(wb *WorkerBase) error {
- sw, err := ww.allocator()
- if err != nil {
- return err
- }
- *wb = *sw
- return nil
-}
-
func (ww *WorkersWatcher) decreaseNumOfActualWorkers() {
ww.mutex.Lock()
ww.actualNumWorkers--
ww.mutex.Unlock()
}
-func (ww *WorkersWatcher) watch(swc *WorkerBase) {
+func (ww *WorkersWatcher) watch(swc WorkerBase) {
// todo make event to stop function
go func() {
select {
- case ev := <-(*swc).Events():
+ case ev := <-swc.Events():
ww.events <- PoolEvent{Payload: ev}
}
}()