summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
committerValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
commit7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch)
tree3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg/worker_watcher
parentee5d34abde7f3931bf939498eb7a8cb170232f4f (diff)
interfaces folder deprecated
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/interface.go30
-rw-r--r--pkg/worker_watcher/stack.go18
-rw-r--r--pkg/worker_watcher/stack_test.go53
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go22
4 files changed, 84 insertions, 39 deletions
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
new file mode 100644
index 00000000..13991541
--- /dev/null
+++ b/pkg/worker_watcher/interface.go
@@ -0,0 +1,30 @@
+package worker_watcher //nolint:golint,stylecheck
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Watcher interface {
+ // AddToWatch used to add stack to wait its state
+ AddToWatch(workers []worker.SyncWorker) error
+
+ // GetFreeWorker provide first free worker
+ GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error)
+
+ // PutWorker enqueues worker back
+ PushWorker(w worker.SyncWorker)
+
+ // AllocateNew used to allocate new worker and put in into the WorkerWatcher
+ AllocateNew() error
+
+ // Destroy destroys the underlying stack
+ Destroy(ctx context.Context)
+
+ // WorkersList return all stack w/o removing it from internal storage
+ WorkersList() []*worker.SyncWorkerImpl
+
+ // RemoveWorker remove worker from the stack
+ RemoveWorker(wb worker.SyncWorker) error
+}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index c87e8b65..2d23d0e9 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -5,12 +5,12 @@ import (
"sync"
"time"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
- workers []worker.BaseProcess
+ workers []*worker.SyncWorkerImpl
mutex sync.RWMutex
destroy bool
actualNumOfWorkers uint64
@@ -20,7 +20,7 @@ type Stack struct {
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
- workers: make([]worker.BaseProcess, 0, w),
+ workers: make([]*worker.SyncWorkerImpl, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
@@ -39,7 +39,7 @@ func (stack *Stack) Push(w worker.BaseProcess) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w)
+ stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
}
func (stack *Stack) IsEmpty() bool {
@@ -48,7 +48,7 @@ func (stack *Stack) IsEmpty() bool {
return len(stack.workers) == 0
}
-func (stack *Stack) Pop() (worker.BaseProcess, bool) {
+func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
@@ -85,13 +85,15 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
}
// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []worker.BaseProcess {
+func (stack *Stack) Workers() []*worker.SyncWorkerImpl {
stack.mutex.Lock()
defer stack.mutex.Unlock()
- workersCopy := make([]worker.BaseProcess, 0, 1)
+ workersCopy := make([]*worker.SyncWorkerImpl, 0, 1)
// copy
for _, v := range stack.workers {
- workersCopy = append(workersCopy, v)
+ if v != nil {
+ workersCopy = append(workersCopy, v)
+ }
}
return workersCopy
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go
index 86af2043..5287a6dc 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/stack_test.go
@@ -5,24 +5,25 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
func TestNewWorkersStack(t *testing.T) {
stack := NewWorkersStack(0)
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
- assert.Equal(t, []worker.BaseProcess{}, stack.workers)
+ assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers)
}
func TestStack_Push(t *testing.T) {
stack := NewWorkersStack(1)
- w, err := workerImpl.InitBaseWorker(&exec.Cmd{})
+ w, err := worker.InitBaseWorker(&exec.Cmd{})
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
}
@@ -30,10 +31,12 @@ func TestStack_Pop(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
_, _ = stack.Pop()
@@ -43,12 +46,14 @@ func TestStack_Pop(t *testing.T) {
func TestStack_FindAndRemoveByPid(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+
+ stack.Push(sw)
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
stack.FindAndRemoveByPid(w.Pid())
@@ -59,10 +64,12 @@ func TestStack_IsEmpty(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
assert.Equal(t, false, stack.IsEmpty())
@@ -71,11 +78,12 @@ func TestStack_IsEmpty(t *testing.T) {
func TestStack_Workers(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
wrks := stack.Workers()
assert.Equal(t, 1, len(wrks))
@@ -85,11 +93,13 @@ func TestStack_Workers(t *testing.T) {
func TestStack_Reset(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
stack.Reset()
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
@@ -98,11 +108,13 @@ func TestStack_Reset(t *testing.T) {
func TestStack_Destroy(t *testing.T) {
stack := NewWorkersStack(1)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+
stack.Destroy(context.Background())
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
}
@@ -110,12 +122,13 @@ func TestStack_Destroy(t *testing.T) {
func TestStack_DestroyWithWait(t *testing.T) {
stack := NewWorkersStack(2)
cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd)
assert.NoError(t, err)
assert.NoError(t, w.Start())
- stack.Push(w)
- stack.Push(w)
+ sw := worker.From(w)
+ stack.Push(sw)
+ stack.Push(sw)
assert.Equal(t, uint64(2), stack.actualNumOfWorkers)
go func() {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index b0d39165..f87bd021 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -5,13 +5,13 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
)
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher {
ww := &workerWatcher{
stack: NewWorkersStack(uint64(numWorkers)),
allocator: allocator,
@@ -28,18 +28,18 @@ type workerWatcher struct {
events events.Handler
}
-func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
+func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
for i := 0; i < len(workers); i++ {
ww.stack.Push(workers[i])
- go func(swc worker.BaseProcess) {
+ go func(swc worker.SyncWorker) {
ww.wait(swc)
}(workers[i])
}
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) {
+func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
@@ -94,7 +94,7 @@ func (ww *workerWatcher) AllocateNew() error {
return nil
}
-func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
+func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
@@ -114,7 +114,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
}
// O(1) operation
-func (ww *workerWatcher) PushWorker(w worker.BaseProcess) {
+func (ww *workerWatcher) PushWorker(w worker.SyncWorker) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
ww.stack.Push(w)
@@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []worker.BaseProcess {
+func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl {
return ww.stack.Workers()
}
-func (ww *workerWatcher) wait(w worker.BaseProcess) {
+func (ww *workerWatcher) wait(w worker.SyncWorker) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
@@ -158,7 +158,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
}
}
-func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
+func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) {
go func() {
ww.wait(wb)
}()