summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-11-02 15:53:48 +0300
committerValery Piashchynski <[email protected]>2021-11-02 15:53:48 +0300
commitc1c16ddb8717af5b19f45118e615241ac14a54d6 (patch)
treedeeea0f9ff6ad7f14d446ef283243d4f11b6c4f9
parente4f4c7ac0c47231c220a190eef5db5baf956fbe1 (diff)
Update events messagesv2.6.0-alpha.4
Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-xpool/static_pool.go16
-rwxr-xr-xworker_watcher/worker_watcher.go8
2 files changed, 12 insertions, 12 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index a5c98eaa..4ff57173 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -195,7 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid())))
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, worker's pid: %d", err.Error(), w.Pid())))
}
}
@@ -216,7 +216,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err)))
+ sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("no free workers in the pool, error: %s", err)))
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -236,12 +236,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err)))
+ sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -262,7 +262,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -270,7 +270,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
+ sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -294,7 +294,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid())))
+ sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("worker allocated, pid: %d", sw.Pid())))
return sw, nil
}
}
@@ -316,7 +316,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
sw.State().Set(worker.StateDestroyed)
err = sw.Kill()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid())))
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
return nil, err
}
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index d425994e..949958ac 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -279,7 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err)))
+ ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("worker stopped, error: %v", err)))
}
// remove worker
@@ -287,7 +287,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid())))
+ ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("worker destroyed, pid: %d", w.Pid())))
return
}
@@ -297,11 +297,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err)))
+ ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("failed to allocate worker, error: %v", err)))
// no workers at all, panic
if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {
- panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err)))
+ panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v, no workers in the pool", err)))
}
}
}