summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pipe_factory_test.go2
-rw-r--r--plugins/config/tests/config_test.go3
-rw-r--r--plugins/config/viper.go7
-rw-r--r--plugins/factory/app.go1
-rw-r--r--plugins/factory/tests/factory_test.go3
-rw-r--r--plugins/rpc/config_test.go2
-rw-r--r--pool_supervisor.go14
-rw-r--r--socket_factory_test.go41
-rw-r--r--sync_worker.go1
9 files changed, 34 insertions, 40 deletions
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 4eda21a6..95eededa 100644
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -236,4 +236,4 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
b.Fail()
}
}
-} \ No newline at end of file
+}
diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go
index cf5d8489..c85a841f 100644
--- a/plugins/config/tests/config_test.go
+++ b/plugins/config/tests/config_test.go
@@ -40,7 +40,7 @@ func TestViperProvider_Init(t *testing.T) {
}
// stop by CTRL+C
- c := make(chan os.Signal)
+ c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
tt := time.NewTicker(time.Second * 2)
@@ -63,5 +63,4 @@ func TestViperProvider_Init(t *testing.T) {
return
}
}
-
}
diff --git a/plugins/config/viper.go b/plugins/config/viper.go
index b276dbe2..0c34313c 100644
--- a/plugins/config/viper.go
+++ b/plugins/config/viper.go
@@ -14,7 +14,6 @@ type ViperProvider struct {
Prefix string
}
-//////// ENDURE //////////
func (v *ViperProvider) Init() error {
v.viper = viper.New()
@@ -35,8 +34,6 @@ func (v *ViperProvider) Init() error {
return v.viper.ReadInConfig()
}
-///////////// VIPER ///////////////
-
// Overwrite overwrites existing config with provided values
func (v *ViperProvider) Overwrite(values map[string]string) error {
if len(values) != 0 {
@@ -71,8 +68,6 @@ func (v *ViperProvider) Has(name string) bool {
return v.viper.IsSet(name)
}
-/////////// PRIVATE //////////////
-
func parseFlag(flag string) (string, string, error) {
if !strings.Contains(flag, "=") {
return "", "", fmt.Errorf("invalid flag `%s`", flag)
@@ -88,7 +83,7 @@ func parseValue(value string) string {
if escape == '"' || escape == '\'' || escape == '`' {
value = strings.Trim(value, string(escape))
- value = strings.Replace(value, fmt.Sprintf("\\%s", string(escape)), string(escape), -1)
+ value = strings.ReplaceAll(value, fmt.Sprintf("\\%s", string(escape)), string(escape))
}
return value
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index 4106b96f..e4002963 100644
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -33,7 +33,6 @@ type AppConfig struct {
type App struct {
cfg AppConfig
configProvider config.Provider
- factory roadrunner.Factory
}
func (app *App) Init(provider config.Provider) error {
diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go
index 72e28f84..5347083a 100644
--- a/plugins/factory/tests/factory_test.go
+++ b/plugins/factory/tests/factory_test.go
@@ -57,7 +57,7 @@ func TestFactory(t *testing.T) {
}
// stop by CTRL+C
- c := make(chan os.Signal)
+ c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
tt := time.NewTicker(time.Second * 2)
@@ -80,5 +80,4 @@ func TestFactory(t *testing.T) {
return
}
}
-
}
diff --git a/plugins/rpc/config_test.go b/plugins/rpc/config_test.go
index dcc24ec1..36927dd2 100644
--- a/plugins/rpc/config_test.go
+++ b/plugins/rpc/config_test.go
@@ -4,13 +4,11 @@ import (
"testing"
json "github.com/json-iterator/go"
- //"github.com/spiral/roadrunner/service"
"github.com/stretchr/testify/assert"
)
type testCfg struct{ cfg string }
-
func (cfg *testCfg) Unmarshal(out interface{}) error {
j := json.ConfigCompatibleWithStandardLibrary
return j.Unmarshal([]byte(cfg.cfg), out)
diff --git a/pool_supervisor.go b/pool_supervisor.go
index 73c1c5b7..c0a6ecd9 100644
--- a/pool_supervisor.go
+++ b/pool_supervisor.go
@@ -57,6 +57,7 @@ func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, wa
maxPoolMemory: maxPoolMemory,
maxWorkerTTL: maxTtl,
maxWorkerIdle: maxIdle,
+ watchTimeout: watchTimeout,
stopCh: make(chan struct{}),
}
}
@@ -102,7 +103,7 @@ func (sps *staticPoolSupervisor) control() error {
// THIS IS A COPY OF WORKERS
workers := sps.pool.Workers()
- var totalUsedMemory uint64
+ totalUsedMemory := uint64(0)
for i := 0; i < len(workers); i++ {
if workers[i].State().Value() == StateInvalid {
@@ -111,8 +112,13 @@ func (sps *staticPoolSupervisor) control() error {
s, err := WorkerProcessState(workers[i])
if err != nil {
- panic(err)
- // push to pool events??
+ err2 := sps.pool.RemoveWorker(ctx, workers[i])
+ if err2 != nil {
+ sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)}
+ return fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)
+ }
+ sps.pool.Events() <- PoolEvent{Payload: err}
+ return err
}
if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) {
@@ -169,8 +175,6 @@ func (sps *staticPoolSupervisor) control() error {
// if current usage more than max allowed pool memory usage
if totalUsedMemory > sps.maxPoolMemory {
- // destroy pool
- totalUsedMemory = 0
sps.pool.Destroy(ctx)
}
diff --git a/socket_factory_test.go b/socket_factory_test.go
index 0c953b33..cfb95ca1 100644
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -187,12 +187,14 @@ func Test_Tcp_Broken(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- //go func() {
- // err := w.Wait()
- //
- // assert.Error(t, err)
- // assert.Contains(t, err.Error(), "undefined_function()")
- //}()
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := w.Wait(context.Background())
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "undefined_function()")
+ }()
defer func() {
time.Sleep(time.Second)
@@ -210,6 +212,7 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
+ wg.Wait()
}
func Test_Tcp_Echo(t *testing.T) {
@@ -230,9 +233,9 @@ func Test_Tcp_Echo(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
- //go func() {
- // assert.NoError(t, w.Wait())
- //}()
+ go func() {
+ assert.NoError(t, w.Wait(context.Background()))
+ }()
defer func() {
err = w.Stop(ctx)
if err != nil {
@@ -275,9 +278,9 @@ func Test_Unix_Start(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, w)
- //go func() {
- // assert.NoError(t, w.Wait())
- //}()
+ go func() {
+ assert.NoError(t, w.Wait(context.Background()))
+ }()
err = w.Stop(ctx)
if err != nil {
@@ -418,9 +421,9 @@ func Test_Unix_Echo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- //go func() {
- // assert.NoError(t, w.Wait())
- //}()
+ go func() {
+ assert.NoError(t, w.Wait(context.Background()))
+ }()
defer func() {
err = w.Stop(ctx)
if err != nil {
@@ -465,11 +468,9 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
if err != nil {
b.Fatal(err)
}
- //go func() {
- // if w.Wait() != nil {
- // b.Fail()
- // }
- //}()
+ go func() {
+ assert.NoError(b, w.Wait(context.Background()))
+ }()
err = w.Stop(ctx)
if err != nil {
diff --git a/sync_worker.go b/sync_worker.go
index a6e1ed01..de9491d6 100644
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -122,7 +122,6 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
tw.w.State().RegisterExec()
return rsp, nil
-
}
func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {