diff options
-rw-r--r-- | pool.go | 2 | ||||
-rw-r--r-- | pool_supervisor.go | 2 | ||||
-rw-r--r-- | process_state.go | 4 | ||||
-rw-r--r-- | static_pool.go | 12 | ||||
-rw-r--r-- | static_pool_test.go | 76 | ||||
-rw-r--r-- | worker.go | 77 | ||||
-rw-r--r-- | workers_watcher.go | 76 |
7 files changed, 123 insertions, 126 deletions
@@ -50,7 +50,7 @@ type Pool interface { Exec(ctx context.Context, rqs Payload) (Payload, error) // Workers returns worker list associated with the pool. - Workers(ctx context.Context) (workers []WorkerBase) + Workers() (workers []WorkerBase) RemoveWorker(ctx context.Context, worker WorkerBase) error diff --git a/pool_supervisor.go b/pool_supervisor.go index cadf5f9c..93afb8c6 100644 --- a/pool_supervisor.go +++ b/pool_supervisor.go @@ -99,7 +99,7 @@ func (sps *staticPoolSupervisor) control() error { ctx := context.TODO() // THIS IS A COPY OF WORKERS - workers := sps.pool.Workers(ctx) + workers := sps.pool.Workers() var totalUsedMemory uint64 for i := 0; i < len(workers); i++ { diff --git a/process_state.go b/process_state.go index 747fa8a8..1a4c4d65 100644 --- a/process_state.go +++ b/process_state.go @@ -1,8 +1,6 @@ package roadrunner import ( - "context" - "github.com/shirou/gopsutil/process" ) @@ -45,7 +43,7 @@ func WorkerProcessState(w WorkerBase) (ProcessState, error) { // ServerState returns list of all worker states of a given rr server. func PoolState(pool Pool) ([]ProcessState, error) { result := make([]ProcessState, 0) - for _, w := range pool.Workers(context.TODO()) { + for _, w := range pool.Workers() { state, err := WorkerProcessState(w) if err != nil { return nil, err diff --git a/static_pool.go b/static_pool.go index 1444e95a..2e72864d 100644 --- a/static_pool.go +++ b/static_pool.go @@ -51,7 +51,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co events: make(chan PoolEvent), } - p.ww = NewWorkerWatcher(func(args ...interface{}) (*SyncWorker, error) { + p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) if err != nil { return nil, err @@ -61,7 +61,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co if err != nil { return nil, err } - return &sw, nil + return sw, nil }, p.cfg.NumWorkers, p.events) workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) @@ -84,10 +84,8 @@ func (p *StaticPool) Config() Config { } // Workers returns worker list associated with the pool. -func (p *StaticPool) Workers(ctx context.Context) (workers []WorkerBase) { - p.muw.RLock() - defer p.muw.RUnlock() - return p.ww.WorkersList(ctx) +func (p *StaticPool) Workers() (workers []WorkerBase) { + return p.ww.WorkersList() } func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { @@ -136,7 +134,7 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) { w.State().Set(StateInvalid) err = w.Stop(ctx) if err != nil { - panic(err) + return EmptyPayload, err } return p.Exec(ctx, rqs) } diff --git a/static_pool_test.go b/static_pool_test.go index a2daedd6..b2ab4713 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -220,29 +220,36 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Nil(t, res.Context) assert.Equal(t, "hello", res.String()) - assert.Equal(t, runtime.NumCPU(), len(p.Workers(ctx))) + assert.Equal(t, runtime.NumCPU(), len(p.Workers())) // Consume pool events + wg := sync.WaitGroup{} + wg.Add(1) go func() { for true { select { case ev := <-p.Events(): fmt.Println(ev) + if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct { + wg.Done() + } } } }() // killing random worker and expecting pool to replace it - err = p.Workers(ctx)[0].Kill(ctx) + err = p.Workers()[0].Kill(ctx) if err != nil { t.Errorf("error killing the process: error %v", err) } - time.Sleep(time.Second * 2) + wg.Wait() - for _, w := range p.Workers(ctx) { + list := p.Workers() + for _, w := range list { assert.Equal(t, StateReady, w.State().Value()) } + wg.Wait() } func Test_StaticPool_AllocateTimeout(t *testing.T) { @@ -281,7 +288,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { assert.NotNil(t, p) var lastPID string - lastPID = strconv.Itoa(int(p.Workers(ctx)[0].Pid())) + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) res, _ := p.Exec(ctx, Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) @@ -318,8 +325,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { assert.NotNil(t, p) + go func() { + for { + select { + case ev := <-p.Events(): + fmt.Println(ev) + } + } + }() + var lastPID string - lastPID = strconv.Itoa(int(p.Workers(ctx)[0].Pid())) + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) res, err := p.Exec(ctx, Payload{Body: []byte("hello")}) if err != nil { @@ -395,29 +411,31 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { } // identical to replace but controlled on worker side -// TODO inconsistent state -//func Test_Static_Pool_Handle_Dead(t *testing.T) { -// p, err := NewPool( -// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, -// NewPipeFactory(), -// Config{ -// NumWorkers: 5, -// AllocateTimeout: time.Second, -// DestroyTimeout: time.Second, -// }, -// ) -// assert.NoError(t, err) -// defer p.Destroy() -// -// assert.NotNil(t, p) -// -// for _, w := range p.stack { -// w.state.value = StateErrored -// } -// -// _, err = p.Exec(&Payload{Body: []byte("hello")}) -// assert.Error(t, err) -//} +func Test_Static_Pool_Handle_Dead(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, + NewPipeFactory(), + &Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + ExecTTL: time.Second * 5, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + for _, w := range p.Workers() { + w.State().Set(StateErrored) + } + + _, err = p.Exec(ctx, Payload{Body: []byte("hello")}) + assert.Error(t, err) +} // identical to replace but controlled on worker side func Test_Static_Pool_Slow_Destroy(t *testing.T) { @@ -18,7 +18,7 @@ import ( // EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError int64 = iota + 100 + EventWorkerError int64 = iota + 200 // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog @@ -200,49 +200,37 @@ func (w *WorkerProcess) Events() <-chan WorkerEvent { // will be wrapped as WorkerError. Method will return error code if php process fails // to find or Start the script. func (w *WorkerProcess) Wait(ctx context.Context) error { - c := make(chan error) - go func() { - err := multierr.Combine(w.cmd.Wait()) - w.endState = w.cmd.ProcessState - if err != nil { - w.state.Set(StateErrored) - // if there are messages in the events channel, read it - // TODO potentially danger place - if len(w.events) > 0 { - select { - case ev := <-w.events: - err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) - } - } - // if no errors in the events, error might be in the errbuffer - if w.errBuffer.Len() > 0 { - err = multierr.Append(err, errors.New(w.errBuffer.String())) + err := multierr.Combine(w.cmd.Wait()) + w.endState = w.cmd.ProcessState + if err != nil { + w.state.Set(StateErrored) + // if there are messages in the events channel, read it + // TODO potentially danger place + if len(w.events) > 0 { + select { + case ev := <-w.events: + err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) } - - c <- multierr.Append(err, w.closeRelay()) - return } - - err = multierr.Append(err, w.closeRelay()) - if err != nil { - w.state.Set(StateErrored) - c <- err - return + // if no errors in the events, error might be in the errbuffer + if w.errBuffer.Len() > 0 { + err = multierr.Append(err, errors.New(w.errBuffer.String())) } - if w.endState.Success() { - w.state.Set(StateStopped) - } - c <- nil - }() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c: - return err - } + return multierr.Append(err, w.closeRelay()) + } + + err = multierr.Append(err, w.closeRelay()) + if err != nil { + w.state.Set(StateErrored) + return err } + + if w.endState.Success() { + w.state.Set(StateStopped) + } + + return nil } func (w *WorkerProcess) closeRelay() error { @@ -259,20 +247,15 @@ func (w *WorkerProcess) closeRelay() error { func (w *WorkerProcess) Stop(ctx context.Context) error { c := make(chan error) go func() { - var errs []string + var err error w.errBuffer.Close() w.state.Set(StateStopping) w.mu.Lock() defer w.mu.Unlock() - err := sendControl(w.relay, &stopCommand{Stop: true}) + err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true})) if err != nil { - errs = append(errs, err.Error()) w.state.Set(StateKilling) - err = w.cmd.Process.Kill() - if err != nil { - errs = append(errs, err.Error()) - } - c <- errors.New(strings.Join(errs, "|")) + c <- multierr.Append(err, w.cmd.Process.Kill()) } w.state.Set(StateStopped) c <- nil diff --git a/workers_watcher.go b/workers_watcher.go index f8522c46..d9d27196 100644 --- a/workers_watcher.go +++ b/workers_watcher.go @@ -60,9 +60,9 @@ func (stack *Stack) Pop() (WorkerBase, bool) { } type WorkersWatcher struct { - mutex sync.Mutex + mutex sync.RWMutex stack *Stack - allocator func(args ...interface{}) (*SyncWorker, error) + allocator func(args ...interface{}) (WorkerBase, error) initialNumWorkers int64 actualNumWorkers int64 events chan PoolEvent @@ -80,13 +80,13 @@ type WorkerWatcher interface { // Destroy destroys the underlying stack Destroy(ctx context.Context) // WorkersList return all stack w/o removing it from internal storage - WorkersList(ctx context.Context) []WorkerBase + WorkersList() []WorkerBase // RemoveWorker remove worker from the stack RemoveWorker(ctx context.Context, wb WorkerBase) error } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator func(args ...interface{}) (*SyncWorker, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher { +func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher { // todo check if events not nil ww := &WorkersWatcher{ stack: NewWorkersStack(), @@ -106,11 +106,10 @@ func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) return err } ww.stack.Push(sw) + ww.watch(sw) + go func(swc WorkerBase) { - //ww.mutex.Lock() - ww.watch(&swc) - ww.wait(ctx, &swc) - //ww.mutex.Unlock() + ww.wait(ctx, swc) }(sw) } return nil @@ -146,14 +145,14 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) if w == nil { continue } - ww.actualNumWorkers-- + ww.decreaseNumOfActualWorkers() return w, nil case <-tout.C: return nil, errors.New("no free stack") } } } - ww.actualNumWorkers-- + ww.decreaseNumOfActualWorkers() return w, nil } @@ -163,9 +162,9 @@ func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error { if err != nil { return err } - ww.addToWatch(*sw) + ww.addToWatch(sw) ww.stack.mutex.Unlock() - ww.PushWorker(*sw) + ww.PushWorker(sw) return nil } @@ -202,9 +201,7 @@ func (ww *WorkersWatcher) PushWorker(w WorkerBase) { } func (ww *WorkersWatcher) ReduceWorkersCount() { - ww.mutex.Unlock() - ww.actualNumWorkers-- - ww.mutex.Lock() + ww.decreaseNumOfActualWorkers() } // Destroy all underlying stack (but let them to complete the task) @@ -217,9 +214,12 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) { for { select { case <-tt.C: + ww.stack.mutex.Lock() if len(ww.stack.workers) != int(ww.actualNumWorkers) { + ww.stack.mutex.Unlock() continue } + ww.stack.mutex.Unlock() // unnecessary mutex, but // just to make sure. All stack at this moment are in the stack // Pop operation is blocked, push can't be done, since it's not possible to pop @@ -238,53 +238,62 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation -func (ww *WorkersWatcher) WorkersList(ctx context.Context) []WorkerBase { +func (ww *WorkersWatcher) WorkersList() []WorkerBase { return ww.stack.workers } -func (ww *WorkersWatcher) wait(ctx context.Context, w *WorkerBase) { - err := (*w).Wait(ctx) +func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) { + err := w.Wait(ctx) if err != nil { ww.events <- PoolEvent{Payload: WorkerEvent{ Event: EventWorkerError, - Worker: *w, + Worker: w, Payload: err, }} } // If not destroyed, reallocate - if (*w).State().Value() != StateDestroyed { - pid := (*w).Pid() + if w.State().Value() != StateDestroyed { + pid := w.Pid() + ww.stack.mutex.Lock() for i := 0; i < len(ww.stack.workers); i++ { // worker in the stack, reallocating if ww.stack.workers[i].Pid() == pid { - ww.stack.mutex.Lock() ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() - ww.stack.mutex.Unlock() err = ww.AllocateNew(ctx) if err != nil { ww.events <- PoolEvent{Payload: WorkerEvent{ Event: EventWorkerError, - Worker: *w, + Worker: w, Payload: err, }} return } + ww.events <- PoolEvent{Payload: WorkerEvent{ + Event: EventWorkerConstruct, + Worker: nil, + Payload: nil, + }} return } } + ww.stack.mutex.Unlock() // worker not in the stack (not returned), forget and allocate new err = ww.AllocateNew(ctx) if err != nil { ww.events <- PoolEvent{Payload: WorkerEvent{ Event: EventWorkerError, - Worker: *w, + Worker: w, Payload: err, }} return } + ww.events <- PoolEvent{Payload: WorkerEvent{ + Event: EventWorkerConstruct, + Worker: nil, + Payload: nil, + }} } return } @@ -293,30 +302,21 @@ func (ww *WorkersWatcher) addToWatch(wb WorkerBase) { ww.mutex.Lock() defer ww.mutex.Unlock() go func() { - ww.wait(context.Background(), &wb) + ww.wait(context.Background(), wb) }() } -func (ww *WorkersWatcher) reallocate(wb *WorkerBase) error { - sw, err := ww.allocator() - if err != nil { - return err - } - *wb = *sw - return nil -} - func (ww *WorkersWatcher) decreaseNumOfActualWorkers() { ww.mutex.Lock() ww.actualNumWorkers-- ww.mutex.Unlock() } -func (ww *WorkersWatcher) watch(swc *WorkerBase) { +func (ww *WorkersWatcher) watch(swc WorkerBase) { // todo make event to stop function go func() { select { - case ev := <-(*swc).Events(): + case ev := <-swc.Events(): ww.events <- PoolEvent{Payload: ev} } }() |