summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-19 16:47:15 +0300
committerValery Piashchynski <[email protected]>2021-01-19 16:47:15 +0300
commit26f9d35e18ef79d79a5609c6c68f1b6ad38c7aed (patch)
treedd6224660ffd0dcefe2332807203ee1eaf6697b4 /pkg
parent75ebbaac89ce8ebc3ab8de47b16e137844cfcd8a (diff)
Uniform all errors operations
Add new ExecTTL event Update tests Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pipe/pipe_factory.go4
-rwxr-xr-xpkg/pool/static_pool.go8
-rwxr-xr-xpkg/pool/supervisor_pool.go6
-rwxr-xr-xpkg/socket/socket_factory.go6
-rwxr-xr-xpkg/worker/sync_worker.go11
-rwxr-xr-xpkg/worker/worker.go2
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go8
7 files changed, 26 insertions, 19 deletions
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index c36c13e2..b656eff8 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -32,7 +32,7 @@ type SpawnResult struct {
// method Wait() must be handled on level above.
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
c := make(chan SpawnResult)
- const op = errors.Op("spawn worker with context")
+ const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
@@ -114,7 +114,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
- const op = errors.Op("spawn worker")
+ const op = errors.Op("factory_spawn_worker")
w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
return nil, errors.E(op, err)
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 7cac7b4d..438f936f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -54,7 +54,7 @@ type StaticPool struct {
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) {
- const op = errors.Op("static pool initialize")
+ const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
}
@@ -174,7 +174,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
}
func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec with context")
+ const op = errors.Op("static_pool_exec_with_context")
ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxGetFree, op)
@@ -233,6 +233,10 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
+ // just push event if on any stage was timeout error
+ if errors.Is(errors.ExecTTL, err) {
+ sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
+ }
// soft job errors are allowed
if errors.Is(errors.SoftJob, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 07fa7019..19cda759 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -51,7 +51,7 @@ type ttlExec struct {
}
func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec_supervised")
+ const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
}
@@ -89,7 +89,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("supervised exec")
+ const op = errors.Op("supervised_exec")
rsp, err := sp.pool.Exec(p)
if err != nil {
return payload.Payload{}, errors.E(op, err)
@@ -139,7 +139,7 @@ func (sp *supervised) Stop() {
func (sp *supervised) control() {
now := time.Now()
- const op = errors.Op("supervised pool control tick")
+ const op = errors.Op("supervised_pool_control_tick")
// THIS IS A COPY OF WORKERS
workers := sp.pool.Workers()
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index ff882389..8f99ff73 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -86,7 +86,7 @@ type socketSpawn struct {
// SpawnWorker creates Process and connects it to appropriate relay or returns error
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
- const op = errors.Op("spawn_worker_with_context")
+ const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
go func() {
ctx, cancel := context.WithTimeout(ctx, f.tout)
@@ -146,7 +146,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) {
- const op = errors.Op("spawn_worker")
+ const op = errors.Op("factory_spawn_worker")
w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
return nil, err
@@ -201,7 +201,7 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess
}
func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) {
- const op = errors.Op("find_relay")
+ const op = errors.Op("factory_find_relay")
// poll every 1ms for the relay
pollDone := time.NewTimer(f.tout)
for {
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 6a945cf4..8314c039 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -27,7 +27,7 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) {
// Exec payload without TTL timeout.
func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("sync worker Exec")
+ const op = errors.Op("sync_worker_exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
@@ -63,7 +63,7 @@ type wexec struct {
// Exec payload without TTL timeout.
func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("ExecWithTimeout")
+ const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
go func() {
@@ -111,12 +111,15 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
}()
select {
+ // exec TTL reached
case <-ctx.Done():
err := multierr.Combine(tw.Kill())
if err != nil {
+ // append timeout error
+ err = multierr.Append(err, errors.E(op, errors.ExecTTL))
return payload.Payload{}, multierr.Append(err, ctx.Err())
}
- return payload.Payload{}, ctx.Err()
+ return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err())
case res := <-c:
if res.err != nil {
return payload.Payload{}, res.err
@@ -126,7 +129,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
}
func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec pl")
+ const op = errors.Op("sync_worker_exec_payload")
fr := frame.NewFrame()
fr.WriteVersion(frame.VERSION_1)
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 493882a8..aef7f2b0 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -188,7 +188,7 @@ func (w *Process) Start() error {
// will be wrapped as WorkerError. Method will return error code if php process fails
// to find or Start the script.
func (w *Process) Wait() error {
- const op = errors.Op("worker process wait")
+ const op = errors.Op("process_wait")
err := multierr.Combine(w.cmd.Wait())
if w.State().Value() == internal.StateDestroyed {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index bf1f2435..b0d39165 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -40,7 +40,7 @@ func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
}
func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) {
- const op = errors.Op("GetFreeWorker")
+ const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
@@ -81,7 +81,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess,
func (ww *workerWatcher) AllocateNew() error {
ww.stack.mutex.Lock()
- const op = errors.Op("allocate new worker")
+ const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
if err != nil {
return errors.E(op, errors.WorkerAllocate, err)
@@ -98,7 +98,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
- const op = errors.Op("remove worker")
+ const op = errors.Op("worker_watcher_remove_worker")
pid := wb.Pid()
if ww.stack.FindAndRemoveByPid(pid) {
@@ -132,7 +132,7 @@ func (ww *workerWatcher) WorkersList() []worker.BaseProcess {
}
func (ww *workerWatcher) wait(w worker.BaseProcess) {
- const op = errors.Op("process wait")
+ const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
ww.events.Push(events.WorkerEvent{