summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-05 17:37:17 +0300
committerValery Piashchynski <[email protected]>2021-01-05 17:37:17 +0300
commit13b01ccaba1eedeb99d37842ec8f2019d2625187 (patch)
treec645c240336666fa63d70ed2703a78df828c597f /pkg
parent877b0ed461c7d5e1de87b7561f414aeb236cf3ec (diff)
Finish implementation of the KV
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pipe/pipe_factory_test.go3
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/static_pool_test.go2
-rwxr-xr-xpkg/worker/sync_worker.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go4
5 files changed, 8 insertions, 9 deletions
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index dca09375..a2731294 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -46,7 +46,6 @@ func Test_Kill(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- // TODO changed from stopped, discuss
assert.Equal(t, internal.StateErrored, w.State().Value())
}()
@@ -465,7 +464,7 @@ func Test_Error(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- if errors.Is(errors.ErrSoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
}
assert.Contains(t, err.Error(), "hello")
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 808e7d35..bb53e121 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -234,7 +234,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (payload.Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
- if errors.Is(errors.ErrSoftJob, err) {
+ if errors.Is(errors.SoftJob, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index acdd6ab7..53d6b191 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -158,7 +158,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- if errors.Is(errors.ErrSoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false {
t.Fatal("error should be of type errors.Exec")
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 13212cc6..6a945cf4 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -43,7 +43,7 @@ func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.ErrSoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false {
tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
@@ -90,7 +90,7 @@ func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (p
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.ErrSoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false {
tw.w.State().Set(internal.StateErrored)
tw.w.State().RegisterExec()
}
@@ -168,7 +168,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
flags := frameR.ReadFlags()
if flags&byte(frame.ERROR) != byte(0) {
- return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
+ return payload.Payload{}, errors.E(op, errors.SoftJob, errors.Str(string(frameR.Payload())))
}
options := frameR.ReadOptions()
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 5c0882b0..348f0459 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -178,7 +178,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess,
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
- return nil, errors.E(op, errors.ErrWatcherStopped)
+ return nil, errors.E(op, errors.WatcherStopped)
}
// handle worker remove state
@@ -198,7 +198,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess,
default:
w, stop = ww.stack.Pop()
if stop {
- return nil, errors.E(op, errors.ErrWatcherStopped)
+ return nil, errors.E(op, errors.WatcherStopped)
}
if w == nil {
continue