summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-27 11:19:27 +0300
committerGitHub <[email protected]>2020-11-27 11:19:27 +0300
commitb5020bfce6b5362400cb9b578fe32c1a6ed5d61a (patch)
tree902afaca9b225cfe9e3b498b97cc83dec13fcd9a
parent46ae5dcc22d971b0f909bce23ec8fdef26811ed6 (diff)
parent849a03b8ead6fe8e65ab1a1e5653a57c12502dd1 (diff)
Merge pull request #416 from spiral/feature/static_pool_triggerv2.0.0-alpha22
Add new pool event: EventNoFreeWorkers
-rwxr-xr-xgo.mod2
-rwxr-xr-xgo.sum2
-rw-r--r--plugins/http/tests/http_test.go2
-rwxr-xr-xpool.go5
-rwxr-xr-xstatic_pool.go59
-rwxr-xr-xstatic_pool_test.go5
-rwxr-xr-xsupervisor_pool.go12
-rwxr-xr-xsync_worker.go14
-rwxr-xr-xsync_worker_test.go10
-rwxr-xr-xworker_watcher.go36
10 files changed, 79 insertions, 68 deletions
diff --git a/go.mod b/go.mod
index ea794778..042819cb 100755
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,7 @@ require (
github.com/shirou/gopsutil v3.20.10+incompatible
github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta20
- github.com/spiral/errors v1.0.4
+ github.com/spiral/errors v1.0.5
github.com/spiral/goridge/v2 v2.4.6
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
diff --git a/go.sum b/go.sum
index f5c4207f..da6b35bf 100755
--- a/go.sum
+++ b/go.sum
@@ -244,6 +244,8 @@ github.com/spiral/endure v1.0.0-beta20 h1:QD3EJ6CRLgeo/6trfnlUcQhH3vrK8Hvf9ceDpd
github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLqJUgcV+nqxn+o=
github.com/spiral/errors v1.0.4 h1:Y6Bop9GszdDh+Dn3s5aqsGebNLydqZ1F6OdOIQ9EpU0=
github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM=
+github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k=
github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go
index 451566ca..ed98d17e 100644
--- a/plugins/http/tests/http_test.go
+++ b/plugins/http/tests/http_test.go
@@ -811,7 +811,7 @@ func TestHttpMiddleware(t *testing.T) {
wg.Add(1)
go func() {
- tt := time.NewTimer(time.Second * 10)
+ tt := time.NewTimer(time.Second * 15)
defer wg.Done()
for {
select {
diff --git a/pool.go b/pool.go
index 030637c4..3e38c4cb 100755
--- a/pool.go
+++ b/pool.go
@@ -30,6 +30,9 @@ const (
// EventSupervisorError triggered when supervisor can not complete work.
EventSupervisorError
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
// todo: EventMaxMemory caused when worker consumes more memory than allowed.
EventMaxMemory
@@ -60,7 +63,7 @@ type Pool interface {
Workers() (workers []WorkerBase)
// Remove worker from the pool.
- RemoveWorker(ctx context.Context, worker WorkerBase) error
+ RemoveWorker(worker WorkerBase) error
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
diff --git a/static_pool.go b/static_pool.go
index d5511018..b626a499 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -82,7 +82,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo
}
// put stack in the pool
- err = p.ww.AddToWatch(ctx, workers)
+ err = p.ww.AddToWatch(workers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -132,16 +132,18 @@ func (sp *StaticPool) Workers() (workers []WorkerBase) {
return sp.ww.WorkersList()
}
-func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
- return sp.ww.RemoveWorker(ctx, wb)
+func (sp *StaticPool) RemoveWorker(wb WorkerBase) error {
+ return sp.ww.RemoveWorker(wb)
}
func (sp *StaticPool) Exec(p Payload) (Payload, error) {
- const op = errors.Op("Exec")
+ const op = errors.Op("exec")
if sp.cfg.Debug {
return sp.execDebug(p)
}
- w, err := sp.ww.GetFreeWorker(context.Background())
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
return EmptyPayload, errors.E(op, err)
}
@@ -171,7 +173,7 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
}
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew()
if err != nil {
return EmptyPayload, errors.E(op, err)
}
@@ -189,14 +191,17 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) {
}
func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- const op = errors.Op("Exec with context")
- w, err := sp.ww.GetFreeWorker(context.Background())
+ const op = errors.Op("exec with context")
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
if err != nil {
return EmptyPayload, errors.E(op, err)
}
sw := w.(SyncWorker)
+ // apply all before function
if len(sp.before) > 0 {
for i := 0; i < len(sp.before); i++ {
rqs = sp.before[i](rqs)
@@ -220,7 +225,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
}
if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew()
if err != nil {
return EmptyPayload, errors.E(op, err)
}
@@ -228,6 +233,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
sp.ww.PushWorker(sw)
}
+ // apply all after functions
if len(sp.after) > 0 {
for i := 0; i < len(sp.after); i++ {
rsp = sp.after[i](rqs, rsp)
@@ -237,6 +243,21 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload
return rsp, nil
}
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (WorkerBase, error) {
+ // GetFreeWorker function consumes context with timeout
+ w, err := sp.ww.GetFreeWorker(ctxGetFree)
+ if err != nil {
+ // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
+ if errors.Is(errors.NoFreeWorkers, err) {
+ sp.events.Push(PoolEvent{Event: EventNoFreeWorkers, Payload: errors.E(op, err)})
+ return nil, errors.E(op, err)
+ }
+ // else if err not nil - return error
+ return nil, errors.E(op, err)
+ }
+ return w, nil
+}
+
// Destroy all underlying stack (but let them to complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
sp.ww.Destroy(ctx)
@@ -246,11 +267,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w WorkerBase) (Payload, error) {
const op = errors.Op("error encoder")
// soft job errors are allowed
- if errors.Is(errors.Exec, err) {
+ if errors.Is(errors.ErrSoftJob, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew(bCtx)
+ err = sp.ww.AllocateNew()
if err != nil {
- sp.events.Push(PoolEvent{Event: EventPoolError, Payload: errors.E(op, err)})
+ sp.events.Push(PoolEvent{Event: EventWorkerConstruct, Payload: errors.E(op, err)})
}
w.State().Set(StateInvalid)
@@ -318,22 +339,10 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]
w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
if err != nil {
cancel()
- return nil, errors.E(op, err)
+ return nil, errors.E(op, errors.WorkerAllocate, err)
}
workers = append(workers, w)
cancel()
}
return workers, nil
}
-
-func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
- const op = errors.Op("check max jobs")
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew(ctx)
- if err != nil {
- sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
- return errors.E(op, err)
- }
- }
- return nil
-}
diff --git a/static_pool_test.go b/static_pool_test.go
index e97e2034..2823cbc4 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -153,7 +153,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- if errors.Is(errors.Exec, err) == false {
+ if errors.Is(errors.ErrSoftJob, err) == false {
t.Fatal("error should be of type errors.Exec")
}
@@ -273,6 +273,9 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
},
)
assert.Error(t, err)
+ if !errors.Is(errors.WorkerAllocate, err) {
+ t.Fatal("error should be of type WorkerAllocate")
+ }
assert.Nil(t, p)
}
diff --git a/supervisor_pool.go b/supervisor_pool.go
index 6fcb71e6..dfec5559 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -102,8 +102,8 @@ func (sp *supervisedPool) Workers() (workers []WorkerBase) {
return sp.pool.Workers()
}
-func (sp *supervisedPool) RemoveWorker(ctx context.Context, worker WorkerBase) error {
- return sp.pool.RemoveWorker(ctx, worker)
+func (sp *supervisedPool) RemoveWorker(worker WorkerBase) error {
+ return sp.pool.RemoveWorker(worker)
}
func (sp *supervisedPool) Destroy(ctx context.Context) {
@@ -134,7 +134,6 @@ func (sp *supervisedPool) Stop() {
func (sp *supervisedPool) control() {
now := time.Now()
- ctx := context.TODO()
const op = errors.Op("supervised pool control tick")
// THIS IS A COPY OF WORKERS
@@ -152,7 +151,7 @@ func (sp *supervisedPool) control() {
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
- err = sp.pool.RemoveWorker(ctx, workers[i])
+ err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
@@ -162,13 +161,12 @@ func (sp *supervisedPool) control() {
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- err = sp.pool.RemoveWorker(ctx, workers[i])
+ err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
}
sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]})
-
continue
}
@@ -194,7 +192,7 @@ func (sp *supervisedPool) control() {
// maxWorkerIdle more than diff between now and last used
if sp.cfg.IdleTTL-uint64(res) <= 0 {
- err = sp.pool.RemoveWorker(ctx, workers[i])
+ err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)})
return
diff --git a/sync_worker.go b/sync_worker.go
index cd0f934e..7e4d21cc 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -2,7 +2,6 @@ package roadrunner
import (
"context"
- "fmt"
"time"
"github.com/spiral/errors"
@@ -36,12 +35,13 @@ func NewSyncWorker(w WorkerBase) (SyncWorker, error) {
// Exec payload without TTL timeout.
func (tw *syncWorker) Exec(p Payload) (Payload, error) {
+ const op = errors.Op("sync worker Exec")
if len(p.Body) == 0 && len(p.Context) == 0 {
- return EmptyPayload, fmt.Errorf("payload can not be empty")
+ return EmptyPayload, errors.E(op, errors.Str("payload can not be empty"))
}
if tw.w.State().Value() != StateReady {
- return EmptyPayload, fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())
+ return EmptyPayload, errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()))
}
// set last used time
@@ -51,7 +51,7 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.Exec, err) == false {
+ if errors.Is(errors.ErrSoftJob, err) == false {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -97,7 +97,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.Exec, err) == false {
+ if errors.Is(errors.ErrSoftJob, err) == false {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -152,11 +152,11 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
}
if !pr.HasFlag(goridge.PayloadControl) {
- return EmptyPayload, fmt.Errorf("malformed WorkerProcess response")
+ return EmptyPayload, errors.E(op, errors.Str("malformed WorkerProcess response"))
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context)))
+ return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(rsp.Context)))
}
// add streaming support :)
diff --git a/sync_worker_test.go b/sync_worker_test.go
index 69e6ece9..9786d709 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -70,7 +70,7 @@ func Test_BadPayload(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.Equal(t, "payload can not be empty", err.Error())
+ assert.Contains(t, err.Error(), "payload can not be empty")
}
func Test_NotStarted_String(t *testing.T) {
@@ -98,7 +98,7 @@ func Test_NotStarted_Exec(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.Equal(t, "WorkerProcess is not ready (inactive)", err.Error())
+ assert.Contains(t, err.Error(), "WorkerProcess is not ready (inactive)")
}
func Test_String(t *testing.T) {
@@ -215,10 +215,10 @@ func Test_Error(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- if errors.Is(errors.Exec, err) == false {
- t.Fatal("error should be of type errors.Exec")
+ if errors.Is(errors.ErrSoftJob, err) == false {
+ t.Fatal("error should be of type errors.ErrSoftJob")
}
- assert.Contains(t, err.Error(), "exec payload: Exec: hello")
+ assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello")
}
func Test_NumExecs(t *testing.T) {
diff --git a/worker_watcher.go b/worker_watcher.go
index 3b83c8ff..8bc147d0 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -62,7 +62,7 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
type WorkerWatcher interface {
// AddToWatch used to add stack to wait its state
- AddToWatch(ctx context.Context, workers []WorkerBase) error
+ AddToWatch(workers []WorkerBase) error
// GetFreeWorker provide first free worker
GetFreeWorker(ctx context.Context) (WorkerBase, error)
@@ -71,7 +71,7 @@ type WorkerWatcher interface {
PushWorker(w WorkerBase)
// AllocateNew used to allocate new worker and put in into the WorkerWatcher
- AllocateNew(ctx context.Context) error
+ AllocateNew() error
// Destroy destroys the underlying stack
Destroy(ctx context.Context)
@@ -80,7 +80,7 @@ type WorkerWatcher interface {
WorkersList() []WorkerBase
// RemoveWorker remove worker from the stack
- RemoveWorker(ctx context.Context, wb WorkerBase) error
+ RemoveWorker(wb WorkerBase) error
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
@@ -105,7 +105,7 @@ type workerWatcher struct {
events util.EventsHandler
}
-func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error {
+func (ww *workerWatcher) AddToWatch(workers []WorkerBase) error {
for i := 0; i < len(workers); i++ {
sw, err := NewSyncWorker(workers[i])
if err != nil {
@@ -115,14 +115,14 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e
sw.AddListener(ww.events.Push)
go func(swc WorkerBase) {
- ww.wait(ctx, swc)
+ ww.wait(swc)
}(sw)
}
return nil
}
func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
- const op = errors.Op("get_free_worker")
+ const op = errors.Op("GetFreeWorker")
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
@@ -132,19 +132,15 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
// handle worker remove state
// in this state worker is destroyed by supervisor
if w != nil && w.State().Value() == StateRemove {
- err := ww.RemoveWorker(ctx, w)
+ err := ww.RemoveWorker(w)
if err != nil {
return nil, err
}
// try to get next
return ww.GetFreeWorker(ctx)
}
-
// no free stack
if w == nil {
- // TODO allocate timeout
- tout := time.NewTicker(time.Second * 180)
- defer tout.Stop()
for {
select {
default:
@@ -157,8 +153,8 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
}
ww.ReduceWorkersCount()
return w, nil
- case <-tout.C:
- return nil, errors.E(op, errors.Str("no free workers in the stack"))
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
}
}
@@ -167,12 +163,12 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
return w, nil
}
-func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
+func (ww *workerWatcher) AllocateNew() error {
ww.stack.mutex.Lock()
const op = errors.Op("allocate new worker")
sw, err := ww.allocator()
if err != nil {
- return errors.E(op, err)
+ return errors.E(op, errors.WorkerAllocate, err)
}
ww.addToWatch(sw)
@@ -187,7 +183,7 @@ func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
return nil
}
-func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
+func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error {
ww.stack.mutex.Lock()
const op = errors.Op("remove worker")
defer ww.stack.mutex.Unlock()
@@ -275,7 +271,7 @@ func (ww *workerWatcher) WorkersList() []WorkerBase {
return workersCopy
}
-func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
+func (ww *workerWatcher) wait(w WorkerBase) {
const op = errors.Op("process wait")
err := w.Wait()
if err != nil {
@@ -300,7 +296,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
ww.ReduceWorkersCount()
ww.stack.mutex.Unlock()
- err = ww.AllocateNew(ctx)
+ err = ww.AllocateNew()
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
@@ -315,7 +311,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
ww.stack.mutex.Unlock()
// worker not in the stack (not returned), forget and allocate new
- err = ww.AllocateNew(ctx)
+ err = ww.AllocateNew()
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
@@ -329,6 +325,6 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
go func() {
- ww.wait(context.Background(), wb)
+ ww.wait(wb)
}()
}