summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-19 09:49:16 +0300
committerValery Piashchynski <[email protected]>2021-07-19 09:49:16 +0300
commit02fc3664f4ad97e03c8f3a641e7322362f78721c (patch)
treec5cccffde3c9d087ac02846c593a63dfe38de98f /pkg/worker_watcher
parent978451159855c155b2aecdca3c86f47e5c6dd1ff (diff)
Worker watcher interface update.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/container/queue/queue.go4
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go15
2 files changed, 12 insertions, 7 deletions
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
index 4f611bbe..edf81d60 100644
--- a/pkg/worker_watcher/container/queue/queue.go
+++ b/pkg/worker_watcher/container/queue/queue.go
@@ -91,7 +91,9 @@ func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
return w, nil
}
-func (q *Queue) Remove(_ int64) {}
+func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) {
+
+}
func (q *Queue) Destroy() {}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 6e343fff..ca026383 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -18,9 +18,12 @@ type Vector interface {
// Pop used to get worker from the vector
Pop(ctx context.Context) (worker.BaseProcess, error)
// Remove worker with provided pid
- Remove(pid int64) // TODO replace
+ Remove(pid int64)
// Destroy used to stop releasing the workers
Destroy()
+
+ // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation
+ // Replace(prevPid int64, newWorker worker.BaseProcess)
}
type workerWatcher struct {
@@ -63,8 +66,8 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
-// Get is not a thread safe operation
-func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+// Take is not a thread safe operation
+func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
@@ -141,7 +144,7 @@ func (ww *workerWatcher) Allocate() error {
// unlock Allocate mutex
ww.Unlock()
// push the worker to the container
- ww.Push(sw)
+ ww.Release(sw)
return nil
}
@@ -164,8 +167,8 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
}
}
-// Push O(1) operation
-func (ww *workerWatcher) Push(w worker.BaseProcess) {
+// Release O(1) operation
+func (ww *workerWatcher) Release(w worker.BaseProcess) {
switch w.State().Value() {
case worker.StateReady:
ww.container.Push(w)