diff options
author | Valery Piashchynski <[email protected]> | 2021-01-19 16:47:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-19 16:47:15 +0300 |
commit | 26f9d35e18ef79d79a5609c6c68f1b6ad38c7aed (patch) | |
tree | dd6224660ffd0dcefe2332807203ee1eaf6697b4 /pkg | |
parent | 75ebbaac89ce8ebc3ab8de47b16e137844cfcd8a (diff) |
Uniform all errors operations
Add new ExecTTL event
Update tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 8 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 6 | ||||
-rwxr-xr-x | pkg/socket/socket_factory.go | 6 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 11 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 2 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 8 |
7 files changed, 26 insertions, 19 deletions
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index c36c13e2..b656eff8 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -32,7 +32,7 @@ type SpawnResult struct { // method Wait() must be handled on level above. func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { c := make(chan SpawnResult) - const op = errors.Op("spawn worker with context") + const op = errors.Op("factory_spawn_worker_with_timeout") go func() { w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { @@ -114,7 +114,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { - const op = errors.Op("spawn worker") + const op = errors.Op("factory_spawn_worker") w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { return nil, errors.E(op, err) diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 7cac7b4d..438f936f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -54,7 +54,7 @@ type StaticPool struct { // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) { - const op = errors.Op("static pool initialize") + const op = errors.Op("static_pool_initialize") if factory == nil { return nil, errors.E(op, errors.Str("no factory initialized")) } @@ -174,7 +174,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { } func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { - const op = errors.Op("exec with context") + const op = errors.Op("static_pool_exec_with_context") ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxGetFree, op) @@ -233,6 +233,10 @@ func (sp *StaticPool) Destroy(ctx context.Context) { func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.BaseProcess) (payload.Payload, error) { const op = errors.Op("error encoder") + // just push event if on any stage was timeout error + if errors.Is(errors.ExecTTL, err) { + sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) + } // soft job errors are allowed if errors.Is(errors.SoftJob, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 07fa7019..19cda759 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -51,7 +51,7 @@ type ttlExec struct { } func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { - const op = errors.Op("exec_supervised") + const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) } @@ -89,7 +89,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("supervised exec") + const op = errors.Op("supervised_exec") rsp, err := sp.pool.Exec(p) if err != nil { return payload.Payload{}, errors.E(op, err) @@ -139,7 +139,7 @@ func (sp *supervised) Stop() { func (sp *supervised) control() { now := time.Now() - const op = errors.Op("supervised pool control tick") + const op = errors.Op("supervised_pool_control_tick") // THIS IS A COPY OF WORKERS workers := sp.pool.Workers() diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index ff882389..8f99ff73 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -86,7 +86,7 @@ type socketSpawn struct { // SpawnWorker creates Process and connects it to appropriate relay or returns error func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { - const op = errors.Op("spawn_worker_with_context") + const op = errors.Op("factory_spawn_worker_with_timeout") c := make(chan socketSpawn) go func() { ctx, cancel := context.WithTimeout(ctx, f.tout) @@ -146,7 +146,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (worker.BaseProcess, error) { - const op = errors.Op("spawn_worker") + const op = errors.Op("factory_spawn_worker") w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { return nil, err @@ -201,7 +201,7 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess } func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { - const op = errors.Op("find_relay") + const op = errors.Op("factory_find_relay") // poll every 1ms for the relay pollDone := time.NewTimer(f.tout) for { diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 6a945cf4..8314c039 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -27,7 +27,7 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) { // Exec payload without TTL timeout. func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("sync worker Exec") + const op = errors.Op("sync_worker_exec") if len(p.Body) == 0 && len(p.Context) == 0 { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } @@ -63,7 +63,7 @@ type wexec struct { // Exec payload without TTL timeout. func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { - const op = errors.Op("ExecWithTimeout") + const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) go func() { @@ -111,12 +111,15 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p }() select { + // exec TTL reached case <-ctx.Done(): err := multierr.Combine(tw.Kill()) if err != nil { + // append timeout error + err = multierr.Append(err, errors.E(op, errors.ExecTTL)) return payload.Payload{}, multierr.Append(err, ctx.Err()) } - return payload.Payload{}, ctx.Err() + return payload.Payload{}, errors.E(op, errors.ExecTTL, ctx.Err()) case res := <-c: if res.err != nil { return payload.Payload{}, res.err @@ -126,7 +129,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p } func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("exec pl") + const op = errors.Op("sync_worker_exec_payload") fr := frame.NewFrame() fr.WriteVersion(frame.VERSION_1) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 493882a8..aef7f2b0 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -188,7 +188,7 @@ func (w *Process) Start() error { // will be wrapped as WorkerError. Method will return error code if php process fails // to find or Start the script. func (w *Process) Wait() error { - const op = errors.Op("worker process wait") + const op = errors.Op("process_wait") err := multierr.Combine(w.cmd.Wait()) if w.State().Value() == internal.StateDestroyed { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index bf1f2435..b0d39165 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -40,7 +40,7 @@ func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { } func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) { - const op = errors.Op("GetFreeWorker") + const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() if stop { @@ -81,7 +81,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, func (ww *workerWatcher) AllocateNew() error { ww.stack.mutex.Lock() - const op = errors.Op("allocate new worker") + const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { return errors.E(op, errors.WorkerAllocate, err) @@ -98,7 +98,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { ww.mutex.Lock() defer ww.mutex.Unlock() - const op = errors.Op("remove worker") + const op = errors.Op("worker_watcher_remove_worker") pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { @@ -132,7 +132,7 @@ func (ww *workerWatcher) WorkersList() []worker.BaseProcess { } func (ww *workerWatcher) wait(w worker.BaseProcess) { - const op = errors.Op("process wait") + const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { ww.events.Push(events.WorkerEvent{ |