summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-10 16:14:37 +0300
committerValery Piashchynski <[email protected]>2021-07-10 16:14:37 +0300
commitcb2665d93ad7abe1ab30508ff0e2bd4d0bc379ea (patch)
treed434861d00919985e9335c79336695cd3085bdcb
parent453eb10b436925ef91b1206e795e581e6293d132 (diff)
Move interfaces to its consumers
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/pool/interface.go24
-rwxr-xr-xpkg/pool/static_pool.go2
-rw-r--r--pkg/worker_watcher/container/interface.go17
-rw-r--r--pkg/worker_watcher/container/vec.go2
-rw-r--r--pkg/worker_watcher/interface.go30
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go4
6 files changed, 36 insertions, 43 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index c22fbbd3..bbf7653e 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -27,3 +27,27 @@ type Pool interface {
// ExecWithContext executes task with context which is used with timeout
execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
}
+
+// Watcher is an interface for the Sync workers lifecycle
+type Watcher interface {
+ // Watch used to add workers to the container
+ Watch(workers []worker.BaseProcess) error
+
+ // Get provide first free worker
+ Get(ctx context.Context) (worker.BaseProcess, error)
+
+ // Push enqueues worker back
+ Push(w worker.BaseProcess)
+
+ // Allocate - allocates new worker and put it into the WorkerWatcher
+ Allocate() error
+
+ // Destroy destroys the underlying container
+ Destroy(ctx context.Context)
+
+ // List return all container w/o removing it from internal storage
+ List() []worker.BaseProcess
+
+ // Remove will remove worker from the container
+ Remove(wb worker.BaseProcess)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 1c149c51..f2f19795 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -41,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener
// manages worker states and TTLs
- ww workerWatcher.Watcher
+ ww Watcher
// allocate new worker
allocator worker.Allocator
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
deleted file mode 100644
index e10ecdae..00000000
--- a/pkg/worker_watcher/container/interface.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package container
-
-import (
- "context"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// Vector interface represents vector container
-type Vector interface {
- // Enqueue used to put worker to the vector
- Enqueue(worker.BaseProcess)
- // Dequeue used to get worker from the vector
- Dequeue(ctx context.Context) (worker.BaseProcess, error)
- // Destroy used to stop releasing the workers
- Destroy()
-}
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index b9150c43..24b5fa6d 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -13,7 +13,7 @@ type Vec struct {
workers chan worker.BaseProcess
}
-func NewVector(initialNumOfWorkers uint64) Vector {
+func NewVector(initialNumOfWorkers uint64) *Vec {
vec := &Vec{
destroy: 0,
workers: make(chan worker.BaseProcess, initialNumOfWorkers),
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 29fa3640..8fa88fe8 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -6,26 +6,12 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker"
)
-// Watcher is an interface for the Sync workers lifecycle
-type Watcher interface {
- // Watch used to add workers to the container
- Watch(workers []worker.BaseProcess) error
-
- // Get provide first free worker
- Get(ctx context.Context) (worker.BaseProcess, error)
-
- // Push enqueues worker back
- Push(w worker.BaseProcess)
-
- // Allocate - allocates new worker and put it into the WorkerWatcher
- Allocate() error
-
- // Destroy destroys the underlying container
- Destroy(ctx context.Context)
-
- // List return all container w/o removing it from internal storage
- List() []worker.BaseProcess
-
- // Remove will remove worker from the container
- Remove(wb worker.BaseProcess)
+// Vector interface represents vector container
+type Vector interface {
+ // Enqueue used to put worker to the vector
+ Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
+ Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Destroy used to stop releasing the workers
+ Destroy()
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index f82de958..e0dae7f6 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -12,7 +12,7 @@ import (
)
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
numWorkers: numWorkers,
@@ -26,7 +26,7 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events
type workerWatcher struct {
sync.RWMutex
- container container.Vector
+ container Vector
// used to control the Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess