summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xpkg/worker/worker.go2
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go21
-rw-r--r--plugins/server/plugin.go2
3 files changed, 12 insertions, 13 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 2044d0e7..fa74e7b5 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -194,7 +194,7 @@ func (w *Process) Stop() error {
}
// Kill kills underlying process, make sure to call Wait() func to gather
-// error log from the stderr. Does not waits for process completion!
+// error log from the stderr. Does not wait for process completion!
func (w *Process) Kill() error {
if w.State().Value() == StateDestroyed {
err := w.cmd.Process.Signal(os.Kill)
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index bdd91423..8ab9f664 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -77,11 +77,11 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
// thread safe operation
w, err := ww.container.Pop(ctx)
- if errors.Is(errors.WatcherStopped, err) {
- return nil, errors.E(op, errors.WatcherStopped)
- }
-
if err != nil {
+ if errors.Is(errors.WatcherStopped, err) {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+
return nil, errors.E(op, err)
}
@@ -97,9 +97,11 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
// try to continuously get free one
for {
w, err = ww.container.Pop(ctx)
-
- if errors.Is(errors.WatcherStopped, err) {
- return nil, errors.E(op, errors.WatcherStopped)
+ if err != nil {
+ if errors.Is(errors.WatcherStopped, err) {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+ return nil, errors.E(op, err)
}
if err != nil {
@@ -237,11 +239,8 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
ww.Unlock()
continue
}
- ww.Unlock()
- // unnecessary mutex, but
- // just to make sure. All container at this moment are in the container
+ // All container at this moment are in the container
// Pop operation is blocked, push can't be done, since it's not possible to pop
- ww.Lock()
for i := 0; i < len(ww.workers); i++ {
ww.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 16e3bd8c..5f5f2df9 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -216,7 +216,7 @@ func (server *Plugin) collectPoolEvents(event interface{}) {
case events.EventMaxMemory:
server.log.Warn("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid())
case events.EventNoFreeWorkers:
- server.log.Warn("no free workers in pool", "error", we.Payload.(error).Error())
+ server.log.Warn("no free workers in the pool", "error", we.Payload.(error).Error())
case events.EventPoolError:
server.log.Error("pool error", "error", we.Payload.(error).Error())
case events.EventSupervisorError: