diff options
author | Valery Piashchynski <[email protected]> | 2021-01-23 23:38:10 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-23 23:38:10 +0300 |
commit | 7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch) | |
tree | 3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg/worker_watcher | |
parent | ee5d34abde7f3931bf939498eb7a8cb170232f4f (diff) |
interfaces folder deprecated
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r-- | pkg/worker_watcher/interface.go | 30 | ||||
-rw-r--r-- | pkg/worker_watcher/stack.go | 18 | ||||
-rw-r--r-- | pkg/worker_watcher/stack_test.go | 53 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 22 |
4 files changed, 84 insertions, 39 deletions
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go new file mode 100644 index 00000000..13991541 --- /dev/null +++ b/pkg/worker_watcher/interface.go @@ -0,0 +1,30 @@ +package worker_watcher //nolint:golint,stylecheck + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Watcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(workers []worker.SyncWorker) error + + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) + + // PutWorker enqueues worker back + PushWorker(w worker.SyncWorker) + + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew() error + + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + + // WorkersList return all stack w/o removing it from internal storage + WorkersList() []*worker.SyncWorkerImpl + + // RemoveWorker remove worker from the stack + RemoveWorker(wb worker.SyncWorker) error +} diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index c87e8b65..2d23d0e9 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -5,12 +5,12 @@ import ( "sync" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { - workers []worker.BaseProcess + workers []*worker.SyncWorkerImpl mutex sync.RWMutex destroy bool actualNumOfWorkers uint64 @@ -20,7 +20,7 @@ type Stack struct { func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]worker.BaseProcess, 0, w), + workers: make([]*worker.SyncWorkerImpl, 0, w), actualNumOfWorkers: 0, initialNumOfWorkers: initialNumOfWorkers, } @@ -39,7 +39,7 @@ func (stack *Stack) Push(w worker.BaseProcess) { stack.mutex.Lock() defer stack.mutex.Unlock() stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w) + stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) } func (stack *Stack) IsEmpty() bool { @@ -48,7 +48,7 @@ func (stack *Stack) IsEmpty() bool { return len(stack.workers) == 0 } -func (stack *Stack) Pop() (worker.BaseProcess, bool) { +func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { stack.mutex.Lock() defer stack.mutex.Unlock() @@ -85,13 +85,15 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { } // Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.BaseProcess { +func (stack *Stack) Workers() []*worker.SyncWorkerImpl { stack.mutex.Lock() defer stack.mutex.Unlock() - workersCopy := make([]worker.BaseProcess, 0, 1) + workersCopy := make([]*worker.SyncWorkerImpl, 0, 1) // copy for _, v := range stack.workers { - workersCopy = append(workersCopy, v) + if v != nil { + workersCopy = append(workersCopy, v) + } } return workersCopy diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go index 86af2043..5287a6dc 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/stack_test.go @@ -5,24 +5,25 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/interfaces/worker" - workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) func TestNewWorkersStack(t *testing.T) { stack := NewWorkersStack(0) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []worker.BaseProcess{}, stack.workers) + assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers) } func TestStack_Push(t *testing.T) { stack := NewWorkersStack(1) - w, err := workerImpl.InitBaseWorker(&exec.Cmd{}) + w, err := worker.InitBaseWorker(&exec.Cmd{}) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) } @@ -30,10 +31,12 @@ func TestStack_Pop(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) _, _ = stack.Pop() @@ -43,12 +46,14 @@ func TestStack_Pop(t *testing.T) { func TestStack_FindAndRemoveByPid(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + + stack.Push(sw) assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.FindAndRemoveByPid(w.Pid()) @@ -59,10 +64,12 @@ func TestStack_IsEmpty(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) assert.Equal(t, false, stack.IsEmpty()) @@ -71,11 +78,12 @@ func TestStack_IsEmpty(t *testing.T) { func TestStack_Workers(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) wrks := stack.Workers() assert.Equal(t, 1, len(wrks)) @@ -85,11 +93,13 @@ func TestStack_Workers(t *testing.T) { func TestStack_Reset(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.Reset() assert.Equal(t, uint64(0), stack.actualNumOfWorkers) @@ -98,11 +108,13 @@ func TestStack_Reset(t *testing.T) { func TestStack_Destroy(t *testing.T) { stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + stack.Destroy(context.Background()) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } @@ -110,12 +122,13 @@ func TestStack_Destroy(t *testing.T) { func TestStack_DestroyWithWait(t *testing.T) { stack := NewWorkersStack(2) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) - stack.Push(w) - stack.Push(w) + sw := worker.From(w) + stack.Push(sw) + stack.Push(sw) assert.Equal(t, uint64(2), stack.actualNumOfWorkers) go func() { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b0d39165..f87bd021 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -5,13 +5,13 @@ import ( "sync" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher { ww := &workerWatcher{ stack: NewWorkersStack(uint64(numWorkers)), allocator: allocator, @@ -28,18 +28,18 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { +func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) - go func(swc worker.BaseProcess) { + go func(swc worker.SyncWorker) { ww.wait(swc) }(workers[i]) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() @@ -94,7 +94,7 @@ func (ww *workerWatcher) AllocateNew() error { return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { +func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -114,7 +114,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.BaseProcess) { +func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.BaseProcess { +func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl { return ww.stack.Workers() } -func (ww *workerWatcher) wait(w worker.BaseProcess) { +func (ww *workerWatcher) wait(w worker.SyncWorker) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { @@ -158,7 +158,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { +func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { go func() { ww.wait(wb) }() |