summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xplugins/config/tests/config_test.go2
-rwxr-xr-xplugins/factory/app.go9
-rwxr-xr-xplugins/factory/tests/plugin_1.go2
-rwxr-xr-xplugins/factory/tests/plugin_2.go20
-rwxr-xr-xplugins/rpc/rpc.go3
-rwxr-xr-xplugins/rpc/rpc_test.go1
-rwxr-xr-xpool.go3
-rwxr-xr-xsocket_factory.go3
-rwxr-xr-xstatic_pool.go13
-rwxr-xr-xstatic_pool_test.go4
-rwxr-xr-xsupervisor_pool.go13
-rwxr-xr-xsync_worker.go3
-rwxr-xr-xsync_worker_test.go20
-rwxr-xr-xworker.go3
-rwxr-xr-xworker_watcher.go4
15 files changed, 42 insertions, 61 deletions
diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go
index c85a841f..14e60ac2 100755
--- a/plugins/config/tests/config_test.go
+++ b/plugins/config/tests/config_test.go
@@ -48,7 +48,7 @@ func TestViperProvider_Init(t *testing.T) {
for {
select {
case e := <-errCh:
- assert.NoError(t, e.Error.Err)
+ assert.NoError(t, e.Error)
assert.NoError(t, container.Stop())
return
case <-c:
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index 62da4f53..4951e3df 100755
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -3,15 +3,16 @@ package factory
import (
"context"
"fmt"
+ "log"
+ "os"
+ "os/exec"
+ "strings"
+
"github.com/fatih/color"
"github.com/spiral/endure/errors"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
- "log"
- "os"
- "os/exec"
- "strings"
)
const ServiceName = "app"
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go
index df632481..9011bb00 100755
--- a/plugins/factory/tests/plugin_1.go
+++ b/plugins/factory/tests/plugin_1.go
@@ -29,7 +29,7 @@ func (f *Foo) Serve() chan error {
return errCh
}
- cmd, err := f.spawner.CommandFactory(nil)
+ cmd, err := f.spawner.NewCmdFactory(nil)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go
index dbdb065b..9f401bec 100755
--- a/plugins/factory/tests/plugin_2.go
+++ b/plugins/factory/tests/plugin_2.go
@@ -14,13 +14,11 @@ import (
type Foo2 struct {
configProvider config.Provider
wf factory.AppFactory
- spw factory.Spawner
}
-func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory, spawner factory.Spawner) error {
+func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory) error {
f.configProvider = p
f.wf = workerFactory
- f.spw = spawner
return nil
}
@@ -34,7 +32,7 @@ func (f *Foo2) Serve() chan error {
return errCh
}
- cmd, err := f.spw.CommandFactory(nil)
+ cmd, err := f.wf.NewCmdFactory(nil)
if err != nil {
errCh <- err
return errCh
@@ -58,16 +56,18 @@ func (f *Foo2) Serve() chan error {
_ = w
- poolConfig := &roadrunner.Config{
+ poolConfig := roadrunner.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- TTL: 1000,
- IdleTTL: 1000,
- ExecTTL: time.Second * 10,
- MaxPoolMemory: 10000,
- MaxWorkerMemory: 10000,
+ Supervisor: roadrunner.SupervisorConfig{
+ WatchTick: 60,
+ TTL: 1000,
+ IdleTTL: 10,
+ ExecTTL: time.Second * 10,
+ MaxWorkerMemory: 1000,
+ },
}
pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil)
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
index a17b569f..0f6c9753 100755
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/rpc.go
@@ -1,11 +1,12 @@
package rpc
import (
+ "net/rpc"
+
"github.com/spiral/endure"
"github.com/spiral/endure/errors"
"github.com/spiral/goridge/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
- "net/rpc"
)
// RPCPluggable declares the ability to create set of public RPC methods.
diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go
deleted file mode 100755
index 9ab1e3e8..00000000
--- a/plugins/rpc/rpc_test.go
+++ /dev/null
@@ -1 +0,0 @@
-package rpc
diff --git a/pool.go b/pool.go
index fe864878..bc57bcbd 100755
--- a/pool.go
+++ b/pool.go
@@ -2,9 +2,10 @@ package roadrunner
import (
"context"
- "github.com/spiral/roadrunner/v2/util"
"runtime"
"time"
+
+ "github.com/spiral/roadrunner/v2/util"
)
// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
diff --git a/socket_factory.go b/socket_factory.go
index 0db7849b..ed151f2d 100755
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -2,13 +2,14 @@ package roadrunner
import (
"context"
- "github.com/shirou/gopsutil/process"
"net"
"os/exec"
"strings"
"sync"
"time"
+ "github.com/shirou/gopsutil/process"
+
"github.com/pkg/errors"
"github.com/spiral/goridge/v2"
"go.uber.org/multierr"
diff --git a/static_pool.go b/static_pool.go
index 31923134..4ecbdd41 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -3,10 +3,11 @@ package roadrunner
import (
"context"
"fmt"
- "github.com/spiral/roadrunner/v2/util"
"os/exec"
"sync"
+ "github.com/spiral/roadrunner/v2/util"
+
"github.com/pkg/errors"
)
@@ -35,7 +36,7 @@ type StaticPool struct {
ww *workerWatcher
// supervises memory and TTL of workers
- sp *supervisedPool
+ // sp *supervisedPool
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -74,8 +75,8 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con
}
// todo: implement
- //p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor)
- //p.sp.Start()
+ // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ // p.sp.Start()
return p, nil
}
@@ -167,7 +168,7 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
}
// Exec one task with given payload and context, returns result or error.
-//func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
// // todo: why TODO passed here?
// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
// defer cancel()
@@ -235,7 +236,7 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
// }
//
// return rsp, nil
-//}
+// }
// Destroy all underlying stack (but let them to complete the task).
func (p *StaticPool) Destroy(ctx context.Context) {
diff --git a/static_pool_test.go b/static_pool_test.go
index 4a0c483a..ec80e92a 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -157,7 +157,7 @@ func Test_StaticPool_JobError(t *testing.T) {
}
// TODO temporary commented, figure out later
-//func Test_StaticPool_Broken_Replace(t *testing.T) {
+// func Test_StaticPool_Broken_Replace(t *testing.T) {
// ctx := context.Background()
// p, err := NewPool(
// ctx,
@@ -197,7 +197,7 @@ func Test_StaticPool_JobError(t *testing.T) {
// wg.Wait()
//
// p.Destroy(ctx)
-//}
+// }
//
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
diff --git a/supervisor_pool.go b/supervisor_pool.go
index e63b4a59..9d1d2b1e 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -2,8 +2,9 @@ package roadrunner
import (
"context"
- "github.com/spiral/roadrunner/v2/util"
"time"
+
+ "github.com/spiral/roadrunner/v2/util"
)
const MB = 1024 * 1024
@@ -83,8 +84,6 @@ func (sp *supervisedPool) control() {
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- // TODO events
- //sp.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sp.maxWorkerMemory)}
err = sp.pool.RemoveWorker(ctx, workers[i])
if err != nil {
sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
@@ -127,13 +126,5 @@ func (sp *supervisedPool) control() {
}
}
}
-
- // the very last step is to calculate pool memory usage (except excluded workers)
- //totalUsedMemory += s.MemoryUsage
}
-
- //// if current usage more than max allowed pool memory usage
- //if totalUsedMemory > sp.maxPoolMemory {
- // sp.pool.Destroy(ctx)
- //}
}
diff --git a/sync_worker.go b/sync_worker.go
index 85aa832e..d7c15e88 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -3,9 +3,10 @@ package roadrunner
import (
"context"
"fmt"
- "github.com/spiral/roadrunner/v2/util"
"time"
+ "github.com/spiral/roadrunner/v2/util"
+
"github.com/pkg/errors"
"github.com/spiral/goridge/v2"
)
diff --git a/sync_worker_test.go b/sync_worker_test.go
index ad1513d7..7f969283 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -2,10 +2,11 @@ package roadrunner
import (
"context"
- "github.com/stretchr/testify/assert"
"os/exec"
"sync"
"testing"
+
+ "github.com/stretchr/testify/assert"
)
func Test_Echo(t *testing.T) {
@@ -165,23 +166,6 @@ func Test_Broken(t *testing.T) {
assert.Contains(t, string(event.(WorkerEvent).Payload.([]byte)), "undefined_function()")
wg.Done()
})
-
- //go func() {
- // assert.NotNil(t, w)
- // tt := time.NewTimer(time.Second * 10)
- // defer wg.Done()
- // for {
- // select {
- // case ev := <-w.Events():
- // assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()")
- // return
- // case <-tt.C:
- // assert.Error(t, errors.New("no events from worker"))
- // return
- // }
- // }
- //}()
-
syncWorker, err := NewSyncWorker(w)
if err != nil {
t.Fatal(err)
diff --git a/worker.go b/worker.go
index 05b5712d..2dda51cc 100755
--- a/worker.go
+++ b/worker.go
@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
- "github.com/spiral/roadrunner/v2/util"
"os"
"os/exec"
"strconv"
@@ -12,6 +11,8 @@ import (
"sync"
"time"
+ "github.com/spiral/roadrunner/v2/util"
+
"github.com/spiral/goridge/v2"
"go.uber.org/multierr"
)
diff --git a/worker_watcher.go b/worker_watcher.go
index 773f7745..25c88a1a 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -3,9 +3,10 @@ package roadrunner
import (
"context"
"errors"
- "github.com/spiral/roadrunner/v2/util"
"sync"
"time"
+
+ "github.com/spiral/roadrunner/v2/util"
)
var ErrWatcherStopped = errors.New("watcher stopped")
@@ -282,7 +283,6 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
for i := 0; i < len(ww.stack.workers); i++ {
// worker in the stack, reallocating
if ww.stack.workers[i].Pid() == pid {
-
ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
ww.decreaseNumOfActualWorkers()
ww.stack.mutex.Unlock()