summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-31 15:31:30 +0300
committerGitHub <[email protected]>2021-08-31 15:31:30 +0300
commit83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch)
tree884dd2991acf12826752632b8321410e7cc923ce /pkg/pool
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
parent31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff)
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'pkg/pool')
-rwxr-xr-xpkg/pool/static_pool.go6
-rw-r--r--pkg/pool/supervisor_test.go50
2 files changed, 54 insertions, 2 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 3eb0714f..7e190846 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -78,7 +78,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
@@ -329,7 +329,9 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
return nil, errors.E(op, err)
}
- err = sw.Stop()
+ // destroy the worker
+ sw.State().Set(worker.StateDestroyed)
+ err = sw.Kill()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
return nil, errors.E(op, err)
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index d1b24574..14df513e 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "os"
"os/exec"
"testing"
"time"
@@ -361,3 +362,52 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
<-block
p.Destroy(context.Background())
}
+
+func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(2),
+ AllocateTimeout: time.Second * 15,
+ DestroyTimeout: time.Second * 5,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ require.NotNil(t, p)
+
+ time.Sleep(time.Second)
+
+ // should be ok
+ _, err = p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ require.NoError(t, err)
+
+ // after creating this file, PHP will fail
+ file, err := os.Create("break")
+ require.NoError(t, err)
+
+ time.Sleep(time.Second * 5)
+ assert.NoError(t, file.Close())
+ assert.NoError(t, os.Remove("break"))
+
+ defer func() {
+ if r := recover(); r != nil {
+ assert.Fail(t, "panic should not be fired!")
+ } else {
+ p.Destroy(context.Background())
+ }
+ }()
+}