summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-27 15:58:46 +0300
committerValery Piashchynski <[email protected]>2020-10-27 15:58:46 +0300
commitc9af916ae4d78334d348ed1ef7238206f3ecb7a1 (patch)
tree43c04f7c538ecbd0b29fc280d41df5d00f203436 /static_pool.go
parentfd60aff1cddf32b0c45d934fcf14b070df80adcf (diff)
parent105bde0e0c1a7c133d1daa10603ca5ce9a9ade4d (diff)
Merge remote-tracking branch 'origin/release_2.0' into feature/pool_supervisor
# Conflicts: # static_pool.go # sync_worker.go
Diffstat (limited to 'static_pool.go')
-rwxr-xr-xstatic_pool.go89
1 files changed, 55 insertions, 34 deletions
diff --git a/static_pool.go b/static_pool.go
index 3af933c3..7a5f6103 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -39,6 +39,11 @@ type StaticPool struct {
func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) {
cfg.InitDefaults()
+ if cfg.Debug {
+ cfg.NumWorkers = 0
+ cfg.MaxJobs = 1
+ }
+
p := &StaticPool{
cfg: cfg,
cmd: cmd,
@@ -82,27 +87,30 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
}
// AddListener connects event listener to the pool.
-func (p *StaticPool) AddListener(listener util.EventListener) {
- p.events.AddListener(listener)
+func (sp *StaticPool) AddListener(listener util.EventListener) {
+ sp.events.AddListener(listener)
}
// Config returns associated pool configuration. Immutable.
-func (p *StaticPool) GetConfig() Config {
- return p.cfg
+func (sp *StaticPool) GetConfig() Config {
+ return sp.cfg
}
// Workers returns worker list associated with the pool.
-func (p *StaticPool) Workers() (workers []WorkerBase) {
- return p.ww.WorkersList()
+func (sp *StaticPool) Workers() (workers []WorkerBase) {
+ return sp.ww.WorkersList()
}
-func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
- return p.ww.RemoveWorker(ctx, wb)
+func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
+ return sp.ww.RemoveWorker(ctx, wb)
}
-func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
+func (sp *StaticPool) Exec(p Payload) (Payload, error) {
const op = errors.Op("Exec")
- w, err := p.ww.GetFreeWorker(context.Background())
+ if sp.cfg.Debug {
+ return sp.execDebug(p)
+ }
+ w, err := sp.ww.GetFreeWorker(context.Background())
if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
return EmptyPayload, errors.E(op, err)
} else if err != nil {
@@ -111,30 +119,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
sw := w.(SyncWorker)
- rsp, err := sw.Exec(rqs)
+ rsp, err := sw.Exec(p)
if err != nil {
// soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err := p.ww.AllocateNew(bCtx)
+ if _, jobError := err.(ExecError); jobError {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew(bCtx)
if err != nil {
- p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
}
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
} else {
- p.ww.PushWorker(w)
+ sp.ww.PushWorker(w)
}
return EmptyPayload, err
}
sw.State().Set(StateInvalid)
- p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
if errS != nil {
@@ -149,25 +157,38 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
- return p.Exec(rqs)
+ return sp.Exec(p)
}
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err = p.ww.AllocateNew(bCtx)
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew(bCtx)
if err != nil {
return EmptyPayload, err
}
} else {
- p.muw.Lock()
- p.ww.PushWorker(w)
- p.muw.Unlock()
+ sp.ww.PushWorker(w)
}
return rsp, nil
}
+func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
+ sw, err := sp.ww.allocator()
+ if err != nil {
+ return EmptyPayload, err
+ }
+
+ r, err := sw.(SyncWorker).Exec(p)
+
+ if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err})
+ }
+
+ return r, err
+}
+
func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
const op = errors.Op("Exec")
w, err := p.ww.GetFreeWorker(context.Background())
@@ -237,18 +258,18 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload,
}
// Destroy all underlying stack (but let them to complete the task).
-func (p *StaticPool) Destroy(ctx context.Context) {
- p.ww.Destroy(ctx)
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.ww.Destroy(ctx)
}
// allocate required number of stack
-func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
+func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
var workers []WorkerBase
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
- ctx, cancel := context.WithTimeout(ctx, p.cfg.AllocateTimeout)
- w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
if err != nil {
cancel()
return nil, err
@@ -259,11 +280,11 @@ func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]W
return workers, nil
}
-func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err := p.ww.AllocateNew(ctx)
+func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew(ctx)
if err != nil {
- p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
return err
}
}