diff options
author | Valery Piashchynski <[email protected]> | 2021-01-05 17:37:17 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-05 17:37:17 +0300 |
commit | 13b01ccaba1eedeb99d37842ec8f2019d2625187 (patch) | |
tree | c645c240336666fa63d70ed2703a78df828c597f /pkg | |
parent | 877b0ed461c7d5e1de87b7561f414aeb236cf3ec (diff) |
Finish implementation of the KV
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 3 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 2 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 6 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 4 |
5 files changed, 8 insertions, 9 deletions
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index dca09375..a2731294 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -46,7 +46,6 @@ func Test_Kill(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - // TODO changed from stopped, discuss assert.Equal(t, internal.StateErrored, w.State().Value()) }() @@ -465,7 +464,7 @@ func Test_Error(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - if errors.Is(errors.ErrSoftJob, err) == false { + if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") } assert.Contains(t, err.Error(), "hello") diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 808e7d35..bb53e121 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -234,7 +234,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.BaseProcess) (payload.Payload, error) { const op = errors.Op("error encoder") // soft job errors are allowed - if errors.Is(errors.ErrSoftJob, err) { + if errors.Is(errors.SoftJob, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index acdd6ab7..53d6b191 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -158,7 +158,7 @@ func Test_StaticPool_JobError(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - if errors.Is(errors.ErrSoftJob, err) == false { + if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.Exec") } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 13212cc6..6a945cf4 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -43,7 +43,7 @@ func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.ErrSoftJob, err) == false { + if errors.Is(errors.SoftJob, err) == false { tw.w.State().Set(internal.StateErrored) tw.w.State().RegisterExec() } @@ -90,7 +90,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.ErrSoftJob, err) == false { + if errors.Is(errors.SoftJob, err) == false { tw.w.State().Set(internal.StateErrored) tw.w.State().RegisterExec() } @@ -168,7 +168,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { flags := frameR.ReadFlags() if flags&byte(frame.ERROR) != byte(0) { - return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) + return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload()))) } options := frameR.ReadOptions() diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 5c0882b0..348f0459 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -178,7 +178,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, // thread safe operation w, stop := ww.stack.Pop() if stop { - return nil, errors.E(op, errors.ErrWatcherStopped) + return nil, errors.E(op, errors.WatcherStopped) } // handle worker remove state @@ -198,7 +198,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, default: w, stop = ww.stack.Pop() if stop { - return nil, errors.E(op, errors.ErrWatcherStopped) + return nil, errors.E(op, errors.WatcherStopped) } if w == nil { continue |