summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/pool/interface.go8
-rwxr-xr-xpkg/pool/static_pool.go18
-rwxr-xr-xpkg/pool/supervisor_pool.go6
-rw-r--r--pkg/worker_watcher/container/queue/queue.go4
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go15
-rw-r--r--plugins/jobs/rpc.go2
-rw-r--r--tests/plugins/http/http_plugin_test.go2
7 files changed, 30 insertions, 25 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index bbf7653e..a040d27a 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -33,11 +33,11 @@ type Watcher interface {
// Watch used to add workers to the container
Watch(workers []worker.BaseProcess) error
- // Get provide first free worker
- Get(ctx context.Context) (worker.BaseProcess, error)
+ // Take takes the first free worker
+ Take(ctx context.Context) (worker.BaseProcess, error)
- // Push enqueues worker back
- Push(w worker.BaseProcess)
+ // Release releases the worker putting it back to the queue
+ Release(w worker.BaseProcess)
// Allocate - allocates new worker and put it into the WorkerWatcher
Allocate() error
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index f2f19795..037294ea 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -142,7 +142,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
}
ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxGetFree, op)
+ w, err := sp.takeWorker(ctxGetFree, op)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -163,7 +163,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return rsp, nil
}
// return worker back
- sp.ww.Push(w)
+ sp.ww.Release(w)
return rsp, nil
}
@@ -176,7 +176,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxAlloc, op)
+ w, err := sp.takeWorker(ctxAlloc, op)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
@@ -198,7 +198,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
}
// return worker back
- sp.ww.Push(w)
+ sp.ww.Release(w)
return rsp, nil
}
@@ -216,16 +216,16 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Set(worker.StateMaxJobsReached)
- sp.ww.Push(w)
+ sp.ww.Release(w)
return
}
- sp.ww.Push(w)
+ sp.ww.Release(w)
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
// Get function consumes context with timeout
- w, err := sp.ww.Get(ctxGetFree)
+ w, err := sp.ww.Take(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
@@ -265,7 +265,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
} else {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Push(w)
+ sp.ww.Release(w)
}
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 4b990dbe..aa6c7cfa 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -136,7 +136,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
@@ -156,7 +156,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
@@ -211,7 +211,7 @@ func (sp *supervised) control() { //nolint:gocognit
/*
worker at this point might be in the middle of request execution:
- ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
^
TTL Reached, state - invalid |
-----> Worker Stopped here
diff --git a/pkg/worker_watcher/container/queue/queue.go b/pkg/worker_watcher/container/queue/queue.go
index 4f611bbe..edf81d60 100644
--- a/pkg/worker_watcher/container/queue/queue.go
+++ b/pkg/worker_watcher/container/queue/queue.go
@@ -91,7 +91,9 @@ func (q *Queue) Pop(ctx context.Context) (worker.BaseProcess, error) {
return w, nil
}
-func (q *Queue) Remove(_ int64) {}
+func (q *Queue) Replace(oldPid int64, newWorker worker.BaseProcess) {
+
+}
func (q *Queue) Destroy() {}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 6e343fff..ca026383 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -18,9 +18,12 @@ type Vector interface {
// Pop used to get worker from the vector
Pop(ctx context.Context) (worker.BaseProcess, error)
// Remove worker with provided pid
- Remove(pid int64) // TODO replace
+ Remove(pid int64)
// Destroy used to stop releasing the workers
Destroy()
+
+ // TODO Add Replace method, and remove `Remove` method. Replace will do removal and allocation
+ // Replace(prevPid int64, newWorker worker.BaseProcess)
}
type workerWatcher struct {
@@ -63,8 +66,8 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
-// Get is not a thread safe operation
-func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+// Take is not a thread safe operation
+func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
@@ -141,7 +144,7 @@ func (ww *workerWatcher) Allocate() error {
// unlock Allocate mutex
ww.Unlock()
// push the worker to the container
- ww.Push(sw)
+ ww.Release(sw)
return nil
}
@@ -164,8 +167,8 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
}
}
-// Push O(1) operation
-func (ww *workerWatcher) Push(w worker.BaseProcess) {
+// Release O(1) operation
+func (ww *workerWatcher) Release(w worker.BaseProcess) {
switch w.State().Value() {
case worker.StateReady:
ww.container.Push(w)
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 4333c587..10158e74 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -15,7 +15,7 @@ type rpc struct {
/*
List of the RPC methods:
-1. Push - single job push
+1. Release - single job push
2. PushBatch - push job batch
3. Reset - managed by the Resetter plugin
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index c3949911..a6c75038 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -604,7 +604,7 @@ func sslPush(t *testing.T) {
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
- assert.Equal(t, "", r.Header.Get("Http2-Push"))
+ assert.Equal(t, "", r.Header.Get("Http2-Release"))
assert.NoError(t, err)
assert.Equal(t, 201, r.StatusCode)