summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-27 15:50:57 +0300
committerGitHub <[email protected]>2020-10-27 15:50:57 +0300
commit105bde0e0c1a7c133d1daa10603ca5ce9a9ade4d (patch)
tree0d4664ef76ff6515fa965690a79dc69604eb3849
parent91cf918b30938129609323ded53e190385e019a6 (diff)
parent2176584129e493e08aed158bc050070d520ee183 (diff)
Merge pull request #376 from spiral/feature/lazy-load
Feature/lazy load
-rwxr-xr-xerrors.go6
-rwxr-xr-xerrors_test.go2
-rwxr-xr-xpool.go3
-rwxr-xr-xstatic_pool.go92
-rwxr-xr-xstatic_pool_test.go41
-rwxr-xr-xsync_worker.go12
-rwxr-xr-xsync_worker_test.go2
-rwxr-xr-xworker_watcher.go2
8 files changed, 112 insertions, 48 deletions
diff --git a/errors.go b/errors.go
index 52356549..7c91a92b 100755
--- a/errors.go
+++ b/errors.go
@@ -1,11 +1,11 @@
package roadrunner
-// JobError is job level error (no WorkerProcess halt), wraps at top
+// ExecError is job level error (no WorkerProcess halt), wraps at top
// of error context
-type JobError []byte
+type ExecError []byte
// Error converts error context to string
-func (te JobError) Error() string {
+func (te ExecError) Error() string {
return string(te)
}
diff --git a/errors_test.go b/errors_test.go
index 75a86840..86ab908d 100755
--- a/errors_test.go
+++ b/errors_test.go
@@ -8,7 +8,7 @@ import (
)
func Test_JobError_Error(t *testing.T) {
- e := JobError([]byte("error"))
+ e := ExecError([]byte("error"))
assert.Equal(t, "error", e.Error())
}
diff --git a/pool.go b/pool.go
index bc57bcbd..aca1b340 100755
--- a/pool.go
+++ b/pool.go
@@ -66,6 +66,9 @@ type Pool interface {
// Configures the pool behaviour.
type Config struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
NumWorkers int64
diff --git a/static_pool.go b/static_pool.go
index 4ecbdd41..6f247d9e 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -43,6 +43,11 @@ type StaticPool struct {
func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) {
cfg.InitDefaults()
+ if cfg.Debug {
+ cfg.NumWorkers = 0
+ cfg.MaxJobs = 1
+ }
+
p := &StaticPool{
cfg: cfg,
cmd: cmd,
@@ -82,26 +87,30 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
}
// AddListener connects event listener to the pool.
-func (p *StaticPool) AddListener(listener util.EventListener) {
- p.events.AddListener(listener)
+func (sp *StaticPool) AddListener(listener util.EventListener) {
+ sp.events.AddListener(listener)
}
// Config returns associated pool configuration. Immutable.
-func (p *StaticPool) GetConfig() Config {
- return p.cfg
+func (sp *StaticPool) GetConfig() Config {
+ return sp.cfg
}
// Workers returns worker list associated with the pool.
-func (p *StaticPool) Workers() (workers []WorkerBase) {
- return p.ww.WorkersList()
+func (sp *StaticPool) Workers() (workers []WorkerBase) {
+ return sp.ww.WorkersList()
}
-func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
- return p.ww.RemoveWorker(ctx, wb)
+func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
+ return sp.ww.RemoveWorker(ctx, wb)
}
-func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
- w, err := p.ww.GetFreeWorker(context.Background())
+func (sp *StaticPool) Exec(p Payload) (Payload, error) {
+ if sp.cfg.Debug {
+ return sp.execDebug(p)
+ }
+
+ w, err := sp.ww.GetFreeWorker(context.Background())
if err != nil && errors.Is(err, ErrWatcherStopped) {
return EmptyPayload, ErrWatcherStopped
} else if err != nil {
@@ -110,30 +119,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
sw := w.(SyncWorker)
- rsp, err := sw.Exec(rqs)
+ rsp, err := sw.Exec(p)
if err != nil {
// soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err := p.ww.AllocateNew(bCtx)
+ if _, jobError := err.(ExecError); jobError {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew(bCtx)
if err != nil {
- p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
}
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
} else {
- p.ww.PushWorker(w)
+ sp.ww.PushWorker(w)
}
return EmptyPayload, err
}
sw.State().Set(StateInvalid)
- p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
if errS != nil {
@@ -148,25 +157,38 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
- return p.Exec(rqs)
+ return sp.Exec(p)
}
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err = p.ww.AllocateNew(bCtx)
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew(bCtx)
if err != nil {
return EmptyPayload, err
}
} else {
- p.muw.Lock()
- p.ww.PushWorker(w)
- p.muw.Unlock()
+ sp.ww.PushWorker(w)
}
return rsp, nil
}
+func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
+ sw, err := sp.ww.allocator()
+ if err != nil {
+ return EmptyPayload, err
+ }
+
+ r, err := sw.(SyncWorker).Exec(p)
+
+ if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err})
+ }
+
+ return r, err
+}
+
// Exec one task with given payload and context, returns result or error.
// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
// // todo: why TODO passed here?
@@ -200,7 +222,7 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
// }
//
// // soft job errors are allowed
-// if _, jobError := err.(JobError); jobError {
+// if _, jobError := err.(ExecError); jobError {
// p.ww.PushWorker(w)
// return EmptyPayload, err
// }
@@ -239,18 +261,18 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
// }
// Destroy all underlying stack (but let them to complete the task).
-func (p *StaticPool) Destroy(ctx context.Context) {
- p.ww.Destroy(ctx)
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.ww.Destroy(ctx)
}
// allocate required number of stack
-func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
+func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
var workers []WorkerBase
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
- ctx, cancel := context.WithTimeout(ctx, p.cfg.AllocateTimeout)
- w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
if err != nil {
cancel()
return nil, err
@@ -261,11 +283,11 @@ func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]W
return workers, nil
}
-func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err := p.ww.AllocateNew(ctx)
+func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew(ctx)
if err != nil {
- p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
return err
}
}
diff --git a/static_pool_test.go b/static_pool_test.go
index ec80e92a..b75bd0bf 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -152,7 +152,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, JobError{}, err)
+ assert.IsType(t, ExecError{}, err)
assert.Equal(t, "hello", err.Error())
}
@@ -301,6 +301,45 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
}
}
+func Test_StaticPool_Debug_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
+ NewPipeFactory(),
+ Config{
+ Debug: true,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ assert.Len(t, p.Workers(), 0)
+
+ var lastPID string
+ res, _ := p.Exec(Payload{Body: []byte("hello")})
+ assert.NotEqual(t, lastPID, string(res.Body))
+
+ assert.Len(t, p.Workers(), 0)
+
+ for i := 0; i < 10; i++ {
+ assert.Len(t, p.Workers(), 0)
+ res, err := p.Exec(Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx := context.Background()
diff --git a/sync_worker.go b/sync_worker.go
index d7c15e88..2f3eb1e4 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -18,7 +18,7 @@ type SyncWorker interface {
WorkerBase
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
- Exec(rqs Payload) (Payload, error)
+ Exec(p Payload) (Payload, error)
}
type syncWorker struct {
@@ -47,7 +47,7 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(JobError); !ok {
+ if _, ok := err.(ExecError); !ok {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -60,13 +60,13 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) {
return rsp, nil
}
-func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
+func (tw *syncWorker) execPayload(p Payload) (Payload, error) {
// two things; todo: merge
- if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
+ if err := sendControl(tw.w.Relay(), p.Context); err != nil {
return EmptyPayload, errors.Wrap(err, "header error")
}
- if err := tw.w.Relay().Send(rqs.Body, 0); err != nil {
+ if err := tw.w.Relay().Send(p.Body, 0); err != nil {
return EmptyPayload, errors.Wrap(err, "sender error")
}
@@ -83,7 +83,7 @@ func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, JobError(rsp.Context)
+ return EmptyPayload, ExecError(rsp.Context)
}
// add streaming support :)
diff --git a/sync_worker_test.go b/sync_worker_test.go
index 7f969283..1bc2deb1 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -206,7 +206,7 @@ func Test_Error(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, JobError{}, err)
+ assert.IsType(t, ExecError{}, err)
assert.Equal(t, "hello", err.Error())
}
diff --git a/worker_watcher.go b/worker_watcher.go
index 25c88a1a..5ae54024 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -19,7 +19,7 @@ type Stack struct {
func NewWorkersStack() *Stack {
return &Stack{
- workers: make([]WorkerBase, 0, 12),
+ workers: make([]WorkerBase, 0),
}
}