diff options
-rwxr-xr-x | pkg/worker/worker.go | 2 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 21 | ||||
-rw-r--r-- | plugins/server/plugin.go | 2 |
3 files changed, 12 insertions, 13 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 2044d0e7..fa74e7b5 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -194,7 +194,7 @@ func (w *Process) Stop() error { } // Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Does not waits for process completion! +// error log from the stderr. Does not wait for process completion! func (w *Process) Kill() error { if w.State().Value() == StateDestroyed { err := w.cmd.Process.Signal(os.Kill) diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index bdd91423..8ab9f664 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -77,11 +77,11 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { // thread safe operation w, err := ww.container.Pop(ctx) - if errors.Is(errors.WatcherStopped, err) { - return nil, errors.E(op, errors.WatcherStopped) - } - if err != nil { + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + return nil, errors.E(op, err) } @@ -97,9 +97,11 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { // try to continuously get free one for { w, err = ww.container.Pop(ctx) - - if errors.Is(errors.WatcherStopped, err) { - return nil, errors.E(op, errors.WatcherStopped) + if err != nil { + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + return nil, errors.E(op, err) } if err != nil { @@ -237,11 +239,8 @@ func (ww *workerWatcher) Destroy(_ context.Context) { ww.Unlock() continue } - ww.Unlock() - // unnecessary mutex, but - // just to make sure. All container at this moment are in the container + // All container at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.Lock() for i := 0; i < len(ww.workers); i++ { ww.workers[i].State().Set(worker.StateDestroyed) // kill the worker diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 16e3bd8c..5f5f2df9 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -216,7 +216,7 @@ func (server *Plugin) collectPoolEvents(event interface{}) { case events.EventMaxMemory: server.log.Warn("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid()) case events.EventNoFreeWorkers: - server.log.Warn("no free workers in pool", "error", we.Payload.(error).Error()) + server.log.Warn("no free workers in the pool", "error", we.Payload.(error).Error()) case events.EventPoolError: server.log.Error("pool error", "error", we.Payload.(error).Error()) case events.EventSupervisorError: |