diff options
-rw-r--r-- | pipe_factory_test.go | 2 | ||||
-rw-r--r-- | plugins/config/tests/config_test.go | 3 | ||||
-rw-r--r-- | plugins/config/viper.go | 7 | ||||
-rw-r--r-- | plugins/factory/app.go | 1 | ||||
-rw-r--r-- | plugins/factory/tests/factory_test.go | 3 | ||||
-rw-r--r-- | plugins/rpc/config_test.go | 2 | ||||
-rw-r--r-- | pool_supervisor.go | 14 | ||||
-rw-r--r-- | socket_factory_test.go | 41 | ||||
-rw-r--r-- | sync_worker.go | 1 |
9 files changed, 34 insertions, 40 deletions
diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 4eda21a6..95eededa 100644 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -236,4 +236,4 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { b.Fail() } } -}
\ No newline at end of file +} diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go index cf5d8489..c85a841f 100644 --- a/plugins/config/tests/config_test.go +++ b/plugins/config/tests/config_test.go @@ -40,7 +40,7 @@ func TestViperProvider_Init(t *testing.T) { } // stop by CTRL+C - c := make(chan os.Signal) + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) tt := time.NewTicker(time.Second * 2) @@ -63,5 +63,4 @@ func TestViperProvider_Init(t *testing.T) { return } } - } diff --git a/plugins/config/viper.go b/plugins/config/viper.go index b276dbe2..0c34313c 100644 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -14,7 +14,6 @@ type ViperProvider struct { Prefix string } -//////// ENDURE ////////// func (v *ViperProvider) Init() error { v.viper = viper.New() @@ -35,8 +34,6 @@ func (v *ViperProvider) Init() error { return v.viper.ReadInConfig() } -///////////// VIPER /////////////// - // Overwrite overwrites existing config with provided values func (v *ViperProvider) Overwrite(values map[string]string) error { if len(values) != 0 { @@ -71,8 +68,6 @@ func (v *ViperProvider) Has(name string) bool { return v.viper.IsSet(name) } -/////////// PRIVATE ////////////// - func parseFlag(flag string) (string, string, error) { if !strings.Contains(flag, "=") { return "", "", fmt.Errorf("invalid flag `%s`", flag) @@ -88,7 +83,7 @@ func parseValue(value string) string { if escape == '"' || escape == '\'' || escape == '`' { value = strings.Trim(value, string(escape)) - value = strings.Replace(value, fmt.Sprintf("\\%s", string(escape)), string(escape), -1) + value = strings.ReplaceAll(value, fmt.Sprintf("\\%s", string(escape)), string(escape)) } return value diff --git a/plugins/factory/app.go b/plugins/factory/app.go index 4106b96f..e4002963 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -33,7 +33,6 @@ type AppConfig struct { type App struct { cfg AppConfig configProvider config.Provider - factory roadrunner.Factory } func (app *App) Init(provider config.Provider) error { diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go index 72e28f84..5347083a 100644 --- a/plugins/factory/tests/factory_test.go +++ b/plugins/factory/tests/factory_test.go @@ -57,7 +57,7 @@ func TestFactory(t *testing.T) { } // stop by CTRL+C - c := make(chan os.Signal) + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) tt := time.NewTicker(time.Second * 2) @@ -80,5 +80,4 @@ func TestFactory(t *testing.T) { return } } - } diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go index dcc24ec1..36927dd2 100644 --- a/plugins/rpc/config_test.go +++ b/plugins/rpc/config_test.go @@ -4,13 +4,11 @@ import ( "testing" json "github.com/json-iterator/go" - //"github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" ) type testCfg struct{ cfg string } - func (cfg *testCfg) Unmarshal(out interface{}) error { j := json.ConfigCompatibleWithStandardLibrary return j.Unmarshal([]byte(cfg.cfg), out) diff --git a/pool_supervisor.go b/pool_supervisor.go index 73c1c5b7..c0a6ecd9 100644 --- a/pool_supervisor.go +++ b/pool_supervisor.go @@ -57,6 +57,7 @@ func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, wa maxPoolMemory: maxPoolMemory, maxWorkerTTL: maxTtl, maxWorkerIdle: maxIdle, + watchTimeout: watchTimeout, stopCh: make(chan struct{}), } } @@ -102,7 +103,7 @@ func (sps *staticPoolSupervisor) control() error { // THIS IS A COPY OF WORKERS workers := sps.pool.Workers() - var totalUsedMemory uint64 + totalUsedMemory := uint64(0) for i := 0; i < len(workers); i++ { if workers[i].State().Value() == StateInvalid { @@ -111,8 +112,13 @@ func (sps *staticPoolSupervisor) control() error { s, err := WorkerProcessState(workers[i]) if err != nil { - panic(err) - // push to pool events?? + err2 := sps.pool.RemoveWorker(ctx, workers[i]) + if err2 != nil { + sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)} + return fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2) + } + sps.pool.Events() <- PoolEvent{Payload: err} + return err } if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) { @@ -169,8 +175,6 @@ func (sps *staticPoolSupervisor) control() error { // if current usage more than max allowed pool memory usage if totalUsedMemory > sps.maxPoolMemory { - // destroy pool - totalUsedMemory = 0 sps.pool.Destroy(ctx) } diff --git a/socket_factory_test.go b/socket_factory_test.go index 0c953b33..cfb95ca1 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -187,12 +187,14 @@ func Test_Tcp_Broken(t *testing.T) { if err != nil { t.Fatal(err) } - //go func() { - // err := w.Wait() - // - // assert.Error(t, err) - // assert.Contains(t, err.Error(), "undefined_function()") - //}() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := w.Wait(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "undefined_function()") + }() defer func() { time.Sleep(time.Second) @@ -210,6 +212,7 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) + wg.Wait() } func Test_Tcp_Echo(t *testing.T) { @@ -230,9 +233,9 @@ func Test_Tcp_Echo(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "tcp") w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) - //go func() { - // assert.NoError(t, w.Wait()) - //}() + go func() { + assert.NoError(t, w.Wait(context.Background())) + }() defer func() { err = w.Stop(ctx) if err != nil { @@ -275,9 +278,9 @@ func Test_Unix_Start(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, w) - //go func() { - // assert.NoError(t, w.Wait()) - //}() + go func() { + assert.NoError(t, w.Wait(context.Background())) + }() err = w.Stop(ctx) if err != nil { @@ -418,9 +421,9 @@ func Test_Unix_Echo(t *testing.T) { if err != nil { t.Fatal(err) } - //go func() { - // assert.NoError(t, w.Wait()) - //}() + go func() { + assert.NoError(t, w.Wait(context.Background())) + }() defer func() { err = w.Stop(ctx) if err != nil { @@ -465,11 +468,9 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { if err != nil { b.Fatal(err) } - //go func() { - // if w.Wait() != nil { - // b.Fail() - // } - //}() + go func() { + assert.NoError(b, w.Wait(context.Background())) + }() err = w.Stop(ctx) if err != nil { diff --git a/sync_worker.go b/sync_worker.go index a6e1ed01..de9491d6 100644 --- a/sync_worker.go +++ b/sync_worker.go @@ -122,7 +122,6 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { tw.w.State().RegisterExec() return rsp, nil - } func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { |