summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-03-28 14:00:54 +0300
committerValery Piashchynski <[email protected]>2021-03-28 14:00:54 +0300
commit2a58b1be2c79f2fe10c0a429878937661645a928 (patch)
treef3a7cd472c75c4dd2a97bcf97cb154731ed81230 /pkg
parent970014530a23d57a3be41c6369ac6456d0b36ae1 (diff)
- Fix bug with the worker reallocating during the response
- Update .golangci and fix new warnings Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r--pkg/pool/config.go2
-rwxr-xr-xpkg/pool/static_pool.go43
-rwxr-xr-xpkg/pool/static_pool_test.go10
-rwxr-xr-xpkg/pool/supervisor_pool.go1
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go2
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go2
-rwxr-xr-xpkg/transport/socket/socket_factory.go14
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go56
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go42
-rwxr-xr-xpkg/worker/state.go3
-rwxr-xr-xpkg/worker/sync_worker.go4
-rwxr-xr-xpkg/worker/worker.go1
-rw-r--r--pkg/worker_watcher/container/stack.go143
-rw-r--r--pkg/worker_watcher/container/stack_test.go143
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go73
15 files changed, 125 insertions, 414 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 782f7ce9..2a3dabe4 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -5,7 +5,7 @@ import (
"time"
)
-// Configures the pool behaviour.
+// Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 0617cbc0..c8e45b82 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -47,7 +47,7 @@ type StaticPool struct {
allocator worker.Allocator
// err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint
+ err_encoder ErrorEncoder //nolint:golint,stylecheck
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -159,11 +159,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return sp.Exec(p)
}
- err = sp.checkMaxJobs(w)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ if sp.cfg.MaxJobs != 0 {
+ sp.checkMaxJobs(w)
+ return rsp, nil
}
-
+ // return worker back
+ sp.ww.Push(w)
return rsp, nil
}
@@ -188,11 +189,13 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
return sp.execWithTTL(ctx, p)
}
- err = sp.checkMaxJobs(w)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ if sp.cfg.MaxJobs != 0 {
+ sp.checkMaxJobs(w)
+ return rsp, nil
}
+ // return worker back
+ sp.ww.Push(w)
return rsp, nil
}
@@ -206,19 +209,15 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error {
- const op = errors.Op("static_pool_check_max_jobs")
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- w.State().Set(worker.StateDestroyed)
- sp.ww.Remove(w)
- err := sp.ww.Allocate()
- if err != nil {
- return errors.E(op, err)
- }
- } else {
+//go:inline
+func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
+ if w.State().NumExecs() >= sp.cfg.MaxJobs {
+ w.State().Set(worker.StateMaxJobsReached)
sp.ww.Push(w)
+ return
}
- return nil
+
+ sp.ww.Push(w)
}
func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
@@ -281,9 +280,9 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
- ctx, cancel := context.WithTimeout(ctx, timeout)
+ ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
- w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
+ w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...)
if err != nil {
return nil, err
}
@@ -316,7 +315,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
- var workers []worker.BaseProcess
+ workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index b1318f9d..2d8aad48 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -213,7 +213,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
}
- var cfg = Config{
+ var cfg2 = Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -223,7 +223,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- cfg,
+ cfg2,
AddListeners(listener),
)
assert.NoError(t, err)
@@ -361,7 +361,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
pipe.NewPipeFactory(),
Config{
NumWorkers: 1,
- AllocateTimeout: time.Second,
+ AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
)
@@ -432,8 +432,8 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.Exec(payload.Payload{Body: []byte("100")})
- if err != nil {
+ _, errP := p.Exec(payload.Payload{Body: []byte("100")})
+ if errP != nil {
t.Errorf("error executing payload: error %v", err)
}
}()
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 5abeae7a..273adc30 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -166,7 +166,6 @@ func (sp *supervised) Stop() {
func (sp *supervised) control() {
now := time.Now()
- const op = errors.Op("supervised_pool_control_tick")
// MIGHT BE OUTDATED
// It's a copy of the Workers pointers
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index a00b2117..51befb1e 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -281,7 +281,7 @@ func Test_Echo2(t *testing.T) {
assert.NoError(t, sw.Wait())
}()
defer func() {
- err := sw.Stop()
+ err = sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index fb40ecb0..3ef65be8 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -299,7 +299,7 @@ func Test_Echo(t *testing.T) {
assert.NoError(t, sw.Wait())
}()
defer func() {
- err := sw.Stop()
+ err = sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index f58f9561..990eb384 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -88,7 +88,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
go func() {
- ctx, cancel := context.WithTimeout(ctx, f.tout)
+ ctxT, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
@@ -108,7 +108,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
return
}
- rl, err := f.findRelayWithContext(ctx, w)
+ rl, err := f.findRelayWithContext(ctxT, w)
if err != nil {
err = multierr.Combine(
err,
@@ -179,18 +179,19 @@ func (f *Factory) Close() error {
// waits for Process to connect over socket and returns associated relay of timeout
func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) {
- ticker := time.NewTicker(time.Millisecond * 100)
+ ticker := time.NewTicker(time.Millisecond * 10)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
+ // check for the process exists
_, err := process.NewProcess(int32(w.Pid()))
if err != nil {
return nil, err
}
default:
- tmp, ok := f.relays.Load(w.Pid())
+ tmp, ok := f.relays.LoadAndDelete(w.Pid())
if !ok {
continue
}
@@ -221,8 +222,3 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) {
func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) {
f.relays.Store(pid, relay)
}
-
-// deletes relay chan associated with specific pid
-func (f *Factory) removeRelayFromPid(pid int64) {
- f.relays.Delete(pid)
-}
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index 1361693b..b875e2c8 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -19,8 +19,8 @@ func Test_Tcp_Start2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -55,7 +55,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
f := NewSocketServer(ls, time.Minute)
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -75,8 +75,8 @@ func Test_Tcp_StartError2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -131,8 +131,8 @@ func Test_Tcp_Invalid2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -151,8 +151,8 @@ func Test_Tcp_Broken2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -181,8 +181,8 @@ func Test_Tcp_Broken2(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
- err := w.Wait()
- assert.Error(t, err)
+ errW := w.Wait()
+ assert.Error(t, errW)
}()
defer func() {
@@ -206,8 +206,8 @@ func Test_Tcp_Echo2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -244,7 +244,7 @@ func Test_Unix_Start2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -268,7 +268,7 @@ func Test_Unix_Failboot2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -295,7 +295,7 @@ func Test_Unix_Timeout2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -311,7 +311,7 @@ func Test_Unix_Invalid2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -326,8 +326,8 @@ func Test_Unix_Broken2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
- assert.NoError(t, err)
+ errC := ls.Close()
+ assert.NoError(t, errC)
}()
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
@@ -351,8 +351,8 @@ func Test_Unix_Broken2(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
- err := w.Wait()
- assert.Error(t, err)
+ errW := w.Wait()
+ assert.Error(t, errW)
}()
defer func() {
@@ -376,7 +376,7 @@ func Test_Unix_Echo2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -412,7 +412,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(b, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(b, err)
}()
@@ -439,7 +439,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(b, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(b, err)
}()
@@ -472,8 +472,8 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
b.Errorf("error closing the listener: error %v", err)
}
}()
@@ -503,8 +503,8 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
b.Errorf("error closing the listener: error %v", err)
}
}()
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index a8dd0fe0..34fe088b 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -22,7 +22,7 @@ func Test_Tcp_Start(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -60,7 +60,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
f := NewSocketServer(ls, time.Minute)
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -82,7 +82,7 @@ func Test_Tcp_StartError(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -143,7 +143,7 @@ func Test_Tcp_Timeout(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -166,7 +166,7 @@ func Test_Tcp_Invalid(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -188,8 +188,8 @@ func Test_Tcp_Broken(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -218,8 +218,8 @@ func Test_Tcp_Broken(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
- err := w.Wait()
- assert.Error(t, err)
+ errW := w.Wait()
+ assert.Error(t, errW)
}()
defer func() {
@@ -245,7 +245,7 @@ func Test_Tcp_Echo(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -284,7 +284,7 @@ func Test_Unix_Start(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -314,7 +314,7 @@ func Test_Unix_Failboot(t *testing.T) {
ctx := context.Background()
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -347,7 +347,7 @@ func Test_Unix_Timeout(t *testing.T) {
ctx := context.Background()
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -369,7 +369,7 @@ func Test_Unix_Invalid(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -390,8 +390,8 @@ func Test_Unix_Broken(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -422,8 +422,8 @@ func Test_Unix_Broken(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
- err := w.Wait()
- assert.Error(t, err)
+ errW := w.Wait()
+ assert.Error(t, errW)
}()
defer func() {
@@ -448,7 +448,7 @@ func Test_Unix_Echo(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -559,7 +559,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
b.Errorf("error closing the listener: error %v", err)
}
@@ -588,7 +588,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
b.Errorf("error closing the listener: error %v", err)
}
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
index 176e151b..502f8199 100755
--- a/pkg/worker/state.go
+++ b/pkg/worker/state.go
@@ -27,6 +27,9 @@ const (
// State of worker, when no need to allocate new one
StateDestroyed
+ // State of worker, when it reached executions limit
+ StateMaxJobsReached
+
// StateStopped - process has been terminated.
StateStopped
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index ac987c14..7a1f3131 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -44,7 +44,7 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.SoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple
tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
@@ -91,7 +91,7 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
- if errors.Is(errors.SoftJob, err) == false {
+ if errors.Is(errors.SoftJob, err) == false { //nolint:gosimple
tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 0f7ab755..b04e1363 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -46,7 +46,6 @@ type Process struct {
// InitBaseWorker creates new Process over given exec.cmd.
func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
- const op = errors.Op("init_base_worker")
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
diff --git a/pkg/worker_watcher/container/stack.go b/pkg/worker_watcher/container/stack.go
deleted file mode 100644
index fb8ecd3b..00000000
--- a/pkg/worker_watcher/container/stack.go
+++ /dev/null
@@ -1,143 +0,0 @@
-package container
-
-import (
- "context"
- "runtime"
- "sync"
- "time"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-type Stack struct {
- sync.RWMutex
- workers []worker.BaseProcess
- destroy bool
- actualNumOfWorkers uint64
- initialNumOfWorkers uint64
-}
-
-func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
- w := runtime.NumCPU()
- return &Stack{
- workers: make([]worker.BaseProcess, 0, w),
- actualNumOfWorkers: 0,
- initialNumOfWorkers: initialNumOfWorkers,
- }
-}
-
-func (stack *Stack) Reset() {
- stack.Lock()
- defer stack.Unlock()
- stack.actualNumOfWorkers = 0
- stack.workers = nil
-}
-
-// Push worker back to the vec
-// If vec in destroy state, Push will provide 100ms window to unlock the mutex
-func (stack *Stack) Push(w worker.BaseProcess) {
- stack.Lock()
- defer stack.Unlock()
- stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w)
-}
-
-func (stack *Stack) IsEmpty() bool {
- stack.Lock()
- defer stack.Unlock()
- return len(stack.workers) == 0
-}
-
-func (stack *Stack) Pop() (worker.BaseProcess, bool) {
- stack.Lock()
- defer stack.Unlock()
-
- // do not release new vec
- if stack.destroy {
- return nil, true
- }
-
- if len(stack.workers) == 0 {
- return nil, false
- }
-
- // move worker
- w := stack.workers[len(stack.workers)-1]
- stack.workers = stack.workers[:len(stack.workers)-1]
- stack.actualNumOfWorkers--
- return w, false
-}
-
-func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
- stack.Lock()
- defer stack.Unlock()
- for i := 0; i < len(stack.workers); i++ {
- // worker in the vec, reallocating
- if stack.workers[i].Pid() == pid {
- stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
- stack.actualNumOfWorkers--
- // worker found and removed
- return true
- }
- }
- // no worker with such ID
- return false
-}
-
-// Workers return copy of the workers in the vec
-func (stack *Stack) Workers() []worker.BaseProcess {
- stack.Lock()
- defer stack.Unlock()
- workersCopy := make([]worker.BaseProcess, 0, 1)
- // copy
- // TODO pointers, copy have no sense
- for _, v := range stack.workers {
- if v != nil {
- workersCopy = append(workersCopy, v)
- }
- }
-
- return workersCopy
-}
-
-func (stack *Stack) isDestroying() bool {
- stack.Lock()
- defer stack.Unlock()
- return stack.destroy
-}
-
-// we also have to give a chance to pool to Push worker (return it)
-func (stack *Stack) Destroy(_ context.Context) {
- stack.Lock()
- stack.destroy = true
- stack.Unlock()
-
- tt := time.NewTicker(time.Millisecond * 500)
- defer tt.Stop()
- for {
- select {
- case <-tt.C:
- stack.Lock()
- // that might be one of the workers is working
- if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
- stack.Unlock()
- continue
- }
- stack.Unlock()
- // unnecessary mutex, but
- // just to make sure. All vec at this moment are in the vec
- // Pop operation is blocked, push can't be done, since it's not possible to pop
- stack.Lock()
- for i := 0; i < len(stack.workers); i++ {
- // set state for the vec in the vec (unused at the moment)
- stack.workers[i].State().Set(worker.StateDestroyed)
- // kill the worker
- _ = stack.workers[i].Kill()
- }
- stack.Unlock()
- // clear
- stack.Reset()
- return
- }
- }
-}
diff --git a/pkg/worker_watcher/container/stack_test.go b/pkg/worker_watcher/container/stack_test.go
deleted file mode 100644
index d699664c..00000000
--- a/pkg/worker_watcher/container/stack_test.go
+++ /dev/null
@@ -1,143 +0,0 @@
-package container
-
-import (
- "context"
- "os/exec"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/stretchr/testify/assert"
-)
-
-func TestNewWorkersStack(t *testing.T) {
- stack := NewWorkersStack(0)
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
- assert.Equal(t, []worker.BaseProcess{}, stack.workers)
-}
-
-func TestStack_Push(t *testing.T) {
- stack := NewWorkersStack(1)
-
- w, err := worker.InitBaseWorker(&exec.Cmd{})
- assert.NoError(t, err)
-
- sw := worker.From(w)
-
- stack.Push(sw)
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
-}
-
-func TestStack_Pop(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
-
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
-
- sw := worker.From(w)
-
- stack.Push(sw)
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
-
- _, _ = stack.Pop()
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
-
-func TestStack_FindAndRemoveByPid(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
-
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
-
- stack.Push(sw)
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
-
- stack.FindAndRemoveByPid(w.Pid())
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
-
-func TestStack_IsEmpty(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
-
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
-
- sw := worker.From(w)
- stack.Push(sw)
-
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
-
- assert.Equal(t, false, stack.IsEmpty())
-}
-
-func TestStack_Workers(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
- stack.Push(sw)
-
- wrks := stack.Workers()
- assert.Equal(t, 1, len(wrks))
- assert.Equal(t, w.Pid(), wrks[0].Pid())
-}
-
-func TestStack_Reset(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
- stack.Push(sw)
-
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
- stack.Reset()
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
-
-func TestStack_Destroy(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
- stack.Push(sw)
-
- stack.Destroy(context.Background())
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
-
-func TestStack_DestroyWithWait(t *testing.T) {
- stack := NewWorkersStack(2)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
- stack.Push(sw)
- stack.Push(sw)
- assert.Equal(t, uint64(2), stack.actualNumOfWorkers)
-
- go func() {
- wrk, _ := stack.Pop()
- time.Sleep(time.Second * 3)
- stack.Push(wrk)
- }()
- time.Sleep(time.Second)
- stack.Destroy(context.Background())
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index cc8cc2b6..a6dfe43e 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -79,47 +79,44 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
}
// =========================================================
// SLOW PATH
- _ = w.Kill()
+ _ = w.Kill() // how the worker get here???????
// no free workers in the container
// try to continuously get free one
for {
- select {
- default:
- w, stop = ww.container.Dequeue()
- if stop {
- c <- get{
- nil,
- errors.E(op, errors.WatcherStopped),
- }
+ w, stop = ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
}
+ }
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- c <- get{
- w,
- nil,
- }
- return
- case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateStopping:
- // worker doing no work because it in the container
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ c <- get{
+ w,
+ nil,
}
+ return
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
}
}
}()
@@ -177,6 +174,10 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
// O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
+ if w.State().Value() != worker.StateReady {
+ _ = w.Kill()
+ return
+ }
ww.container.Enqueue(w)
}
@@ -190,7 +191,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
tt := time.NewTicker(time.Millisecond * 100)
defer tt.Stop()
- for {
+ for { //nolint:gosimple
select {
case <-tt.C:
ww.Lock()