summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2020-10-25 15:55:51 +0300
committerWolfy-J <[email protected]>2020-10-25 15:55:51 +0300
commitba5c562f9038ba434e655fb82c44597fcccaff16 (patch)
treeff112b9dcffda63bc40094a57d0df61622368445
parent3bdf7d02d83d1ff4726f3fbb01a45d016f39abec (diff)
- massive update in roadrunner 2.0 abstractions
-rw-r--r--errors.go6
-rw-r--r--errors_test.go2
-rw-r--r--go.mod6
-rw-r--r--go.sum1
-rw-r--r--pipe_factory_test.go8
-rw-r--r--plugins/config/provider.go2
-rw-r--r--plugins/config/viper.go3
-rw-r--r--plugins/events/broadcaster.go24
-rw-r--r--plugins/factory/app.go114
-rw-r--r--plugins/factory/app_provider.go17
-rw-r--r--plugins/factory/config.go37
-rw-r--r--plugins/factory/factory.go73
-rw-r--r--plugins/factory/hello.php1
-rw-r--r--plugins/factory/tests/plugin_1.go4
-rw-r--r--plugins/factory/tests/plugin_2.go8
-rw-r--r--plugins/rpc/config.go3
-rw-r--r--plugins/rpc/rpc.go36
-rw-r--r--pool.go96
-rw-r--r--pool_supervisor.go182
-rw-r--r--pool_watcher.go131
-rw-r--r--socket_factory.go8
-rw-r--r--socket_factory_test.go61
-rw-r--r--static_pool.go210
-rw-r--r--static_pool_test.go114
-rw-r--r--sync_worker.go110
-rw-r--r--sync_worker_test.go62
-rw-r--r--util/events.go26
-rw-r--r--worker.go99
-rw-r--r--worker_test.go3
-rw-r--r--worker_watcher.go (renamed from workers_watcher.go)158
30 files changed, 758 insertions, 847 deletions
diff --git a/errors.go b/errors.go
index b9746702..52356549 100644
--- a/errors.go
+++ b/errors.go
@@ -1,11 +1,11 @@
package roadrunner
-// TaskError is job level error (no WorkerProcess halt), wraps at top
+// JobError is job level error (no WorkerProcess halt), wraps at top
// of error context
-type TaskError []byte
+type JobError []byte
// Error converts error context to string
-func (te TaskError) Error() string {
+func (te JobError) Error() string {
return string(te)
}
diff --git a/errors_test.go b/errors_test.go
index 69f1c9ec..75a86840 100644
--- a/errors_test.go
+++ b/errors_test.go
@@ -8,7 +8,7 @@ import (
)
func Test_JobError_Error(t *testing.T) {
- e := TaskError([]byte("error"))
+ e := JobError([]byte("error"))
assert.Equal(t, "error", e.Error())
}
diff --git a/go.mod b/go.mod
index ddf0fe98..322a7022 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/shirou/gopsutil v2.20.9+incompatible
github.com/spf13/viper v1.7.1
- github.com/spiral/endure v1.0.0-beta8
+ github.com/spiral/endure v1.0.0-beta9
github.com/spiral/goridge/v2 v2.4.5
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
@@ -20,3 +20,7 @@ require (
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.5 // indirect
)
+
+replace (
+ github.com/spiral/endure v1.0.0-beta9 => ./../endure
+) \ No newline at end of file
diff --git a/go.sum b/go.sum
index 72ca37a7..85dbeb85 100644
--- a/go.sum
+++ b/go.sum
@@ -186,6 +186,7 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spiral/endure v1.0.0-beta8 h1:bKVe7F8CbvDZt8UYX3WoSV49OpFPHiM9Py55i7USPK8=
github.com/spiral/endure v1.0.0-beta8/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ=
+github.com/spiral/endure v1.0.0-beta9/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ=
github.com/spiral/goridge/v2 v2.4.5 h1:rg4lLEJLrEh1Wj6G1qTsYVbYiQvig6mOR1F9GyDIGm8=
github.com/spiral/goridge/v2 v2.4.5/go.mod h1:C/EZKFPON9lypi8QO7I5ObgVmrIzTmhZqFz/tmypcGc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 95eededa..ee2510f3 100644
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -101,7 +101,7 @@ func Test_Pipe_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -129,7 +129,7 @@ func Test_Pipe_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -178,7 +178,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.ExecWithContext(context.Background(), Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -205,7 +205,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/plugins/config/provider.go b/plugins/config/provider.go
index 580231fd..ac33b3de 100644
--- a/plugins/config/provider.go
+++ b/plugins/config/provider.go
@@ -1,7 +1,7 @@
package config
type Provider interface {
- // Unmarshal configuration section into configuration object.
+ // UnmarshalKey reads configuration section into configuration object.
//
// func (h *HttpService) Init(cp config.Provider) error {
// h.config := &HttpConfig{}
diff --git a/plugins/config/viper.go b/plugins/config/viper.go
index 0c34313c..4e85af6b 100644
--- a/plugins/config/viper.go
+++ b/plugins/config/viper.go
@@ -14,6 +14,7 @@ type ViperProvider struct {
Prefix string
}
+// Inits config provider.
func (v *ViperProvider) Init() error {
v.viper = viper.New()
@@ -49,7 +50,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error {
return nil
}
-//
+// UnmarshalKey reads configuration section into configuration object.
func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error {
err := v.viper.UnmarshalKey(name, &out)
if err != nil {
diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go
deleted file mode 100644
index 778b307d..00000000
--- a/plugins/events/broadcaster.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package events
-
-type EventListener interface {
- Handle(event interface{})
-}
-
-type EventBroadcaster struct {
- listeners []EventListener
-}
-
-func NewEventBroadcaster() *EventBroadcaster {
- return &EventBroadcaster{}
-}
-
-func (eb *EventBroadcaster) AddListener(l EventListener) {
- // todo: threadcase
- eb.listeners = append(eb.listeners, l)
-}
-
-func (eb *EventBroadcaster) Push(e interface{}) {
- for _, l := range eb.listeners {
- l.Handle(e)
- }
-}
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index 74d8d828..b6cdb3b3 100644
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -1,58 +1,70 @@
package factory
import (
- "errors"
+ "context"
"fmt"
- "os"
- "os/exec"
- "strings"
- "time"
-
+ "github.com/fatih/color"
+ "github.com/spiral/endure/errors"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
+ "log"
+ "os"
+ "os/exec"
+ "strings"
)
-// AppConfig config combines factory, pool and cmd configurations.
-type AppConfig struct {
- Command string
- User string
- Group string
- Env Env
-
- // Listen defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
- // This config section must not change on re-configuration.
- Relay string
-
- // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
- // must not change on re-configuration.
- RelayTimeout time.Duration
+const ServiceName = "app"
+
+type Env map[string]string
+
+// AppFactory creates workers for the application.
+type AppFactory interface {
+ NewCmdFactory(env Env) (func() *exec.Cmd, error)
+ NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
+ NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
}
+// App manages worker
type App struct {
- cfg AppConfig
- configProvider config.Provider
+ cfg Config
+ factory roadrunner.Factory
}
-func (app *App) Init(provider config.Provider) error {
- app.cfg = AppConfig{}
- app.configProvider = provider
-
- err := app.configProvider.UnmarshalKey("app", &app.cfg)
+// Init application provider.
+func (app *App) Init(cfg config.Provider) error {
+ err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
return err
}
+ app.cfg.InitDefaults()
- if app.cfg.Relay == "" {
- app.cfg.Relay = "pipes"
+ return nil
+}
+
+func (app *App) Serve() chan error {
+ errCh := make(chan error, 1)
+ var err error
+
+ app.factory, err = app.initFactory()
+ if err != nil {
+ errCh <- errors.E(errors.Op("init factory"), err)
}
- return nil
+ return errCh
+}
+
+func (app *App) Stop() error {
+ if app.factory == nil {
+ return nil
+ }
+
+ return app.factory.Close(context.Background())
}
-func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
+func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
+
// create command according to the config
cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
@@ -75,15 +87,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
}, nil
}
-// todo ENV unused
-func (app *App) NewFactory() (roadrunner.Factory, error) {
+func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ spawnCmd, err := app.NewCmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ return app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+}
+
+func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
+ spawnCmd, err := app.NewCmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt)
+ if err != nil {
+ return nil, err
+ }
+
+ p.AddListener(func(event interface{}) {
+ if we, ok := event.(roadrunner.WorkerEvent); ok {
+ if we.Event == roadrunner.EventWorkerLog {
+ log.Print(color.YellowString(string(we.Payload.([]byte))))
+ }
+ }
+ })
+
+ return p, nil
+}
+
+func (app *App) initFactory() (roadrunner.Factory, error) {
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
- return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
lsn, err := util.CreateListener(app.cfg.Relay)
@@ -98,7 +140,7 @@ func (app *App) NewFactory() (roadrunner.Factory, error) {
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
default:
- return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go
deleted file mode 100644
index e13b267f..00000000
--- a/plugins/factory/app_provider.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package factory
-
-import (
- "os/exec"
-
- "github.com/spiral/roadrunner/v2"
-)
-
-type Env map[string]string
-
-type Spawner interface {
- // CmdFactory create new command factory with given env variables.
- NewCmd(env Env) (func() *exec.Cmd, error)
-
- // NewFactory inits new factory for workers.
- NewFactory() (roadrunner.Factory, error)
-}
diff --git a/plugins/factory/config.go b/plugins/factory/config.go
new file mode 100644
index 00000000..b2d1d0ad
--- /dev/null
+++ b/plugins/factory/config.go
@@ -0,0 +1,37 @@
+package factory
+
+import "time"
+
+// Config config combines factory, pool and cmd configurations.
+type Config struct {
+ // Command to run as application.
+ Command string
+
+ // User to run application under.
+ User string
+
+ // Group to run application under.
+ Group string
+
+ // Env represents application environment.
+ Env Env
+
+ // Listen defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string
+
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration. Defaults to 60s.
+ RelayTimeout time.Duration
+}
+
+func (cfg *Config) InitDefaults() {
+ if cfg.Relay == "" {
+ cfg.Relay = "pipes"
+ }
+
+ if cfg.RelayTimeout == 0 {
+ cfg.RelayTimeout = time.Second * 60
+ }
+}
diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go
deleted file mode 100644
index f7303b6d..00000000
--- a/plugins/factory/factory.go
+++ /dev/null
@@ -1,73 +0,0 @@
-package factory
-
-import (
- "context"
-
- "log"
-
- "github.com/fatih/color"
- "github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/events"
-)
-
-type WorkerFactory interface {
- NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
- NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error)
-}
-
-type WFactory struct {
- events *events.EventBroadcaster
- app Spawner
- wFactory roadrunner.Factory
-}
-
-func (wf *WFactory) Init(app Spawner) (err error) {
- wf.events = events.NewEventBroadcaster()
-
- wf.app = app
- wf.wFactory, err = app.NewFactory()
- if err != nil {
- return nil
- }
-
- return nil
-}
-
-func (wf *WFactory) AddListener(l events.EventListener) {
- wf.events.AddListener(l)
-}
-
-func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) {
- cmd, err := wf.app.NewCmd(env)
- if err != nil {
- return nil, err
- }
-
- p, err := roadrunner.NewPool(ctx, cmd, wf.wFactory, opt)
- if err != nil {
- return nil, err
- }
-
- // TODO event to stop
- go func() {
- for e := range p.Events() {
- wf.events.Push(e)
- if we, ok := e.Payload.(roadrunner.WorkerEvent); ok {
- if we.Event == roadrunner.EventWorkerLog {
- log.Print(color.YellowString(string(we.Payload.([]byte))))
- }
- }
- }
- }()
-
- return p, nil
-}
-
-func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
- cmd, err := wf.app.NewCmd(env)
- if err != nil {
- return nil, err
- }
-
- return wf.wFactory.SpawnWorkerWithContext(ctx, cmd())
-}
diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php
deleted file mode 100644
index c6199449..00000000
--- a/plugins/factory/hello.php
+++ /dev/null
@@ -1 +0,0 @@
-<?php echo "hello -" . time(); \ No newline at end of file
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go
index 5ab6df73..f6b06dd0 100644
--- a/plugins/factory/tests/plugin_1.go
+++ b/plugins/factory/tests/plugin_1.go
@@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {
func (f *Foo) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.AppConfig{}
+ r := &factory.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spawner.NewCmd(nil)
+ cmd, err := f.spawner.CommandFactory(nil)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go
index 2409627e..dbdb065b 100644
--- a/plugins/factory/tests/plugin_2.go
+++ b/plugins/factory/tests/plugin_2.go
@@ -13,11 +13,11 @@ import (
type Foo2 struct {
configProvider config.Provider
- wf factory.WorkerFactory
+ wf factory.AppFactory
spw factory.Spawner
}
-func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error {
+func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory, spawner factory.Spawner) error {
f.configProvider = p
f.wf = workerFactory
f.spw = spawner
@@ -27,14 +27,14 @@ func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spaw
func (f *Foo2) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.AppConfig{}
+ r := &factory.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spw.NewCmd(nil)
+ cmd, err := f.spw.CommandFactory(nil)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go
index 1039ee5e..719fd5e3 100644
--- a/plugins/rpc/config.go
+++ b/plugins/rpc/config.go
@@ -12,6 +12,9 @@ import (
type Config struct {
// Listen string
Listen string
+
+ // Disabled disables RPC service.
+ Disabled bool
}
// InitDefaults allows to init blank config with pre-defined set of default values.
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
index ef8e82e4..7b47682f 100644
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/rpc.go
@@ -1,8 +1,7 @@
package rpc
import (
- "errors"
-
+ "github.com/spiral/endure/errors"
"github.com/spiral/goridge/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -14,8 +13,8 @@ type RPCService interface {
RPCService() (interface{}, error)
}
-// ID contains default service name.
-const ID = "rpc"
+// ServiceName contains default service name.
+const ServiceName = "rpc"
type services struct {
service interface{}
@@ -32,14 +31,19 @@ type Service struct {
// Init rpc service. Must return true if service is enabled.
func (s *Service) Init(cfg config.Provider) error {
- err := cfg.UnmarshalKey(ID, &s.config)
+ if !cfg.Has(ServiceName) {
+ return errors.E(errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(ServiceName, &s.config)
if err != nil {
return err
}
-
s.config.InitDefaults()
- // todo: handle disabled
+ if s.config.Disabled {
+ return errors.E(errors.Disabled)
+ }
return s.config.Valid()
}
@@ -47,27 +51,15 @@ func (s *Service) Init(cfg config.Provider) error {
// Serve serves the service.
func (s *Service) Serve() chan error {
s.close = make(chan struct{})
-
errCh := make(chan error, 1)
- server := rpc.NewServer()
- if server == nil {
- errCh <- errors.New("rpc server is nil")
- return errCh
- }
- s.rpc = server
-
- if len(s.services) == 0 {
- // todo: why this is an error?
- errCh <- errors.New("no services with RPC")
- return errCh
- }
+ s.rpc = rpc.NewServer()
// Attach all services
for i := 0; i < len(s.services); i++ {
err := s.Register(s.services[i].name, s.services[i].service)
if err != nil {
- errCh <- err
+ errCh <- errors.E(errors.Op("register service"), err)
return errCh
}
}
@@ -134,7 +126,7 @@ func (s *Service) RegisterService(p RPCService) error {
// no suitable methods. It also logs the error using package log.
func (s *Service) Register(name string, svc interface{}) error {
if s.rpc == nil {
- return errors.New("RPC service is not configured")
+ return errors.E("RPC service is not configured")
}
return s.rpc.RegisterName(name, svc)
diff --git a/pool.go b/pool.go
index 343dedf6..fe864878 100644
--- a/pool.go
+++ b/pool.go
@@ -2,51 +2,53 @@ package roadrunner
import (
"context"
+ "github.com/spiral/roadrunner/v2/util"
"runtime"
"time"
)
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event int64
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
+
const (
// EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct = iota + 100
+ EventWorkerConstruct = iota + 7800
// EventWorkerDestruct thrown after worker destruction.
EventWorkerDestruct
- // EventWorkerKill thrown after worker is being forcefully killed.
- EventWorkerKill
-
- // EventWorkerError thrown any worker related even happen (passed with WorkerError)
- EventWorkerEvent
-
- // EventWorkerDead thrown when worker stops worker for any reason.
- EventWorkerDead
-
- // EventPoolError caused on pool wide errors
+ // EventPoolError caused on pool wide errors.
EventPoolError
-)
-const (
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory = iota + 8000
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // todo: EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
- // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
EventTTL
- // EventIdleTTL triggered when worker spends too much time at rest.
+ // todo: EventIdleTTL triggered when worker spends too much time at rest.
EventIdleTTL
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
)
// Pool managed set of inner worker processes.
type Pool interface {
- // ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK
- Events() chan PoolEvent
+ // AddListener connects event listener to the pool.
+ AddListener(listener util.EventListener)
- // Exec one task with given payload and context, returns result or error.
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+ // GetConfig returns pool configuration.
+ GetConfig() Config
// Exec
Exec(rqs Payload) (Payload, error)
@@ -54,18 +56,14 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []WorkerBase)
+ // Remove worker from the pool.
RemoveWorker(ctx context.Context, worker WorkerBase) error
- Config() Config
-
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
}
-// todo: merge with pool options
-
-// Config defines basic behaviour of worker creation and handling process.
-//
+// Configures the pool behaviour.
type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
@@ -84,20 +82,8 @@ type Config struct {
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration
- // TTL defines maximum time worker is allowed to live.
- TTL int64
-
- // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL int64
-
- // ExecTTL defines maximum lifetime per job.
- ExecTTL time.Duration
-
- // MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes.
- MaxPoolMemory uint64
-
- // MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor SupervisorConfig
}
// InitDefaults enables default config values.
@@ -113,4 +99,30 @@ func (cfg *Config) InitDefaults() {
if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
+
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick time.Duration
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL int64
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL int64
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL time.Duration
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = time.Second
+ }
}
diff --git a/pool_supervisor.go b/pool_supervisor.go
deleted file mode 100644
index c0a6ecd9..00000000
--- a/pool_supervisor.go
+++ /dev/null
@@ -1,182 +0,0 @@
-package roadrunner
-
-import (
- "context"
- "errors"
- "fmt"
- "time"
-)
-
-const MB = 1024 * 1024
-
-type Supervisor interface {
- Attach(pool Pool)
- StartWatching() error
- StopWatching()
- Detach()
-}
-
-type staticPoolSupervisor struct {
- // maxWorkerMemory in MB
- maxWorkerMemory uint64
- // maxPoolMemory in MB
- maxPoolMemory uint64
- // maxWorkerTTL in seconds
- maxWorkerTTL uint64
- // maxWorkerIdle in seconds
- maxWorkerIdle uint64
-
- // watchTimeout in seconds
- watchTimeout uint64
- stopCh chan struct{}
-
- pool Pool
-}
-
-/*
-The arguments are:
-maxWorkerMemory - maximum memory allowed for a single worker
-maxPoolMemory - maximum pool memory allowed for a pool of a workers
-maxTtl - maximum ttl for the worker after which it will be killed and replaced
-maxIdle - maximum time to live for the worker in Ready state
-watchTimeout - time between watching for the workers/pool status
-*/
-// TODO might be just wrap the pool and return ControlledPool with included Pool interface
-func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, watchTimeout uint64) Supervisor {
- if maxWorkerMemory == 0 {
- // just set to a big number, 5GB
- maxPoolMemory = 5000 * MB
- }
-
- if watchTimeout == 0 {
- watchTimeout = 60
- }
-
- return &staticPoolSupervisor{
- maxWorkerMemory: maxWorkerMemory,
- maxPoolMemory: maxPoolMemory,
- maxWorkerTTL: maxTtl,
- maxWorkerIdle: maxIdle,
- watchTimeout: watchTimeout,
- stopCh: make(chan struct{}),
- }
-}
-
-func (sps *staticPoolSupervisor) Attach(pool Pool) {
- sps.pool = pool
-}
-
-func (sps *staticPoolSupervisor) StartWatching() error {
- go func() {
- watchTout := time.NewTicker(time.Second * time.Duration(sps.watchTimeout))
- for {
- select {
- case <-sps.stopCh:
- watchTout.Stop()
- return
- // stop here
- case <-watchTout.C:
- err := sps.control()
- if err != nil {
- sps.pool.Events() <- PoolEvent{Payload: err}
- }
- }
- }
- }()
- return nil
-}
-
-func (sps *staticPoolSupervisor) StopWatching() {
- sps.stopCh <- struct{}{}
-}
-
-func (sps *staticPoolSupervisor) Detach() {
-
-}
-
-func (sps *staticPoolSupervisor) control() error {
- if sps.pool == nil {
- return errors.New("pool should be attached")
- }
- now := time.Now()
- ctx := context.TODO()
-
- // THIS IS A COPY OF WORKERS
- workers := sps.pool.Workers()
- totalUsedMemory := uint64(0)
-
- for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == StateInvalid {
- continue
- }
-
- s, err := WorkerProcessState(workers[i])
- if err != nil {
- err2 := sps.pool.RemoveWorker(ctx, workers[i])
- if err2 != nil {
- sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)}
- return fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)
- }
- sps.pool.Events() <- PoolEvent{Payload: err}
- return err
- }
-
- if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) {
- err = sps.pool.RemoveWorker(ctx, workers[i])
- if err != nil {
- return err
- }
-
- // after remove worker we should exclude it from further analysis
- workers = append(workers[:i], workers[i+1:]...)
- }
-
- if sps.maxWorkerMemory != 0 && s.MemoryUsage >= sps.maxWorkerMemory*MB {
- // TODO events
- sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sps.maxWorkerMemory)}
- err = sps.pool.RemoveWorker(ctx, workers[i])
- if err != nil {
- return err
- }
- workers = append(workers[:i], workers[i+1:]...)
- continue
- }
-
- // firs we check maxWorker idle
- if sps.maxWorkerIdle != 0 {
- // then check for the worker state
- if workers[i].State().Value() != StateReady {
- continue
- }
- /*
- Calculate idle time
- If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
- 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
- we are guessing that worker overlap idle time and has to be killed
- */
- // get last used unix nano
- lu := workers[i].State().LastUsed()
- // convert last used to unixNano and sub time.now
- res := int64(lu) - now.UnixNano()
- // maxWorkerIdle more than diff between now and last used
- if int64(sps.maxWorkerIdle)-res <= 0 {
- sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed worker idle time elapsed. actual idle time: %v, max idle time: %v", sps.maxWorkerIdle, res)}
- err = sps.pool.RemoveWorker(ctx, workers[i])
- if err != nil {
- return err
- }
- workers = append(workers[:i], workers[i+1:]...)
- }
- }
-
- // the very last step is to calculate pool memory usage (except excluded workers)
- totalUsedMemory += s.MemoryUsage
- }
-
- // if current usage more than max allowed pool memory usage
- if totalUsedMemory > sps.maxPoolMemory {
- sps.pool.Destroy(ctx)
- }
-
- return nil
-}
diff --git a/pool_watcher.go b/pool_watcher.go
new file mode 100644
index 00000000..6eb614dc
--- /dev/null
+++ b/pool_watcher.go
@@ -0,0 +1,131 @@
+package roadrunner
+
+import (
+ "context"
+ "github.com/spiral/roadrunner/v2/util"
+ "time"
+)
+
+const MB = 1024 * 1024
+
+type supervisedPool struct {
+ cfg SupervisorConfig
+ events *util.EventHandler
+ pool Pool
+ stopCh chan struct{}
+}
+
+func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool {
+ return &supervisedPool{
+ cfg: cfg,
+ events: events,
+ pool: pool,
+ stopCh: make(chan struct{}),
+ }
+}
+
+func (sp *supervisedPool) Start() {
+ go func() {
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
+ for {
+ select {
+ case <-sp.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ sp.control()
+ }
+ }
+ }()
+}
+
+func (sp *supervisedPool) Stop() {
+ sp.stopCh <- struct{}{}
+}
+
+func (sp *supervisedPool) control() {
+ now := time.Now()
+ ctx := context.TODO()
+
+ // THIS IS A COPY OF WORKERS
+ workers := sp.pool.Workers()
+
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().Value() == StateInvalid {
+ continue
+ }
+
+ s, err := WorkerProcessState(workers[i])
+ if err != nil {
+ // worker not longer valid for supervision
+ continue
+ }
+
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ }
+
+ continue
+ }
+
+ if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ // TODO events
+ //sp.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sp.maxWorkerMemory)}
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ }
+
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sp.cfg.IdleTTL != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != StateReady {
+ continue
+ }
+
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+
+ // convert last used to unixNano and sub time.now
+ res := int64(lu) - now.UnixNano()
+
+ // maxWorkerIdle more than diff between now and last used
+ if sp.cfg.IdleTTL-res <= 0 {
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})
+ }
+ }
+ }
+
+ // the very last step is to calculate pool memory usage (except excluded workers)
+ //totalUsedMemory += s.MemoryUsage
+ }
+
+ //// if current usage more than max allowed pool memory usage
+ //if totalUsedMemory > sp.maxPoolMemory {
+ // sp.pool.Destroy(ctx)
+ //}
+}
diff --git a/socket_factory.go b/socket_factory.go
index 27558cce..0db7849b 100644
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -2,6 +2,7 @@ package roadrunner
import (
"context"
+ "github.com/shirou/gopsutil/process"
"net"
"os/exec"
"strings"
@@ -110,6 +111,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm
w.Kill(context.Background()),
w.Wait(context.Background()),
)
+
c <- socketSpawn{
w: nil,
err: err,
@@ -178,10 +180,16 @@ func (f *SocketFactory) Close(ctx context.Context) error {
// waits for WorkerProcess to connect over socket and returns associated relay of timeout
func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase) (*goridge.SocketRelay, error) {
+ ticker := time.NewTicker(time.Millisecond * 100)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
+ case <-ticker.C:
+ _, err := process.NewProcess(int32(w.Pid()))
+ if err != nil {
+ return nil, err
+ }
default:
tmp, ok := f.relays.Load(w.Pid())
if !ok {
diff --git a/socket_factory_test.go b/socket_factory_test.go
index cfb95ca1..6ab87872 100644
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -98,28 +98,29 @@ func Test_Tcp_StartError(t *testing.T) {
assert.Nil(t, w)
}
-// func Test_Tcp_Failboot(t *testing.T) {
-// time.Sleep(time.Millisecond * 10) // to ensure free socket
-//
-// ls, err := net.Listen("tcp", "localhost:9007")
-// if assert.NoError(t, err) {
-// defer func() {
-// err3 := ls.Close()
-// if err3 != nil {
-// t.Errorf("error closing the listener: error %v", err3)
-// }
-// }()
-// } else {
-// t.Skip("socket is busy")
-// }
-//
-// cmd := exec.Command("php", "tests/failboot.php")
-//
-// w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(cmd)
-// assert.Nil(t, w)
-// assert.Error(t, err2)
-// assert.Contains(t, err2.Error(), "failboot")
-//}
+func Test_Tcp_Failboot(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err3 := ls.Close()
+ if err3 != nil {
+ t.Errorf("error closing the listener: error %v", err3)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/failboot.php")
+
+ w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err2)
+ assert.Contains(t, err2.Error(), "failboot")
+}
func Test_Tcp_Timeout(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
@@ -161,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) {
cmd := exec.Command("php", "tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -208,7 +209,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -248,7 +249,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -304,7 +305,7 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*2).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failboot")
@@ -393,7 +394,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
@@ -436,7 +437,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -512,7 +513,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -580,7 +581,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/static_pool.go b/static_pool.go
index 0c2352ad..31923134 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -3,23 +3,21 @@ package roadrunner
import (
"context"
"fmt"
+ "github.com/spiral/roadrunner/v2/util"
"os/exec"
"sync"
"github.com/pkg/errors"
)
-const (
- // StopRequest can be sent by worker to indicate that restart is required.
- StopRequest = "{\"stop\":true}"
-)
+// StopRequest can be sent by worker to indicate that restart is required.
+const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- // pool behaviour
- cfg *Config
+ cfg Config
// worker command creator
cmd func() *exec.Cmd
@@ -27,30 +25,31 @@ type StaticPool struct {
// creates and connects to stack
factory Factory
+ // distributes the events
+ events *util.EventHandler
+
// protects state of worker list, does not affect allocation
muw sync.RWMutex
- ww *WorkersWatcher
+ // manages worker states and TTLs
+ ww *workerWatcher
- events chan PoolEvent
-}
-type PoolEvent struct {
- Payload interface{}
+ // supervises memory and TTL of workers
+ sp *supervisedPool
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-// TODO why cfg is passed by pointer?
-func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) {
+func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) {
cfg.InitDefaults()
p := &StaticPool{
cfg: cfg,
cmd: cmd,
factory: factory,
- events: make(chan PoolEvent),
+ events: &util.EventHandler{},
}
- p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) {
+ p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) {
w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
if err != nil {
return nil, err
@@ -74,12 +73,21 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co
return nil, err
}
+ // todo: implement
+ //p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ //p.sp.Start()
+
return p, nil
}
+// AddListener connects event listener to the pool.
+func (p *StaticPool) AddListener(listener util.EventListener) {
+ p.events.AddListener(listener)
+}
+
// Config returns associated pool configuration. Immutable.
-func (p *StaticPool) Config() Config {
- return *p.cfg
+func (p *StaticPool) GetConfig() Config {
+ return p.cfg
}
// Workers returns worker list associated with the pool.
@@ -103,18 +111,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
rsp, err := sw.Exec(rqs)
if err != nil {
- errJ := p.checkMaxJobs(bCtx, w)
- if errJ != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
- }
// soft job errors are allowed
- if _, jobError := err.(TaskError); jobError {
- p.ww.PushWorker(w)
+ if _, jobError := err.(JobError); jobError {
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err := p.ww.AllocateNew(bCtx)
+ if err != nil {
+ p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ }
+
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+ } else {
+ p.ww.PushWorker(w)
+ }
+
return EmptyPayload, err
}
sw.State().Set(StateInvalid)
+ p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
+
if errS != nil {
return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
}
@@ -127,9 +147,10 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- return EmptyPayload, err
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
- return p.ExecWithContext(bCtx, rqs)
+
+ return p.Exec(rqs)
}
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
@@ -146,81 +167,81 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
}
// Exec one task with given payload and context, returns result or error.
-func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- // todo: why TODO passed here?
- getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
- defer cancel()
- w, err := p.ww.GetFreeWorker(getWorkerCtx)
- if err != nil && errors.Is(err, ErrWatcherStopped) {
- return EmptyPayload, ErrWatcherStopped
- } else if err != nil {
- return EmptyPayload, err
- }
-
- sw := w.(SyncWorker)
-
- var execCtx context.Context
- if p.cfg.ExecTTL != 0 {
- var cancel2 context.CancelFunc
- execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL)
- defer cancel2()
- } else {
- execCtx = ctx
- }
-
- rsp, err := sw.ExecWithContext(execCtx, rqs)
- if err != nil {
- errJ := p.checkMaxJobs(ctx, w)
- if errJ != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
- }
- // soft job errors are allowed
- if _, jobError := err.(TaskError); jobError {
- p.ww.PushWorker(w)
- return EmptyPayload, err
- }
-
- sw.State().Set(StateInvalid)
- errS := w.Stop(ctx)
- if errS != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
- }
-
- return EmptyPayload, err
- }
-
- // worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- w.State().Set(StateInvalid)
- err = w.Stop(ctx)
- if err != nil {
- return EmptyPayload, err
- }
- return p.ExecWithContext(ctx, rqs)
- }
-
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err = p.ww.AllocateNew(ctx)
- if err != nil {
- return EmptyPayload, err
- }
- } else {
- p.muw.Lock()
- p.ww.PushWorker(w)
- p.muw.Unlock()
- }
- return rsp, nil
-}
+//func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+// // todo: why TODO passed here?
+// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
+// defer cancel()
+// w, err := p.ww.GetFreeWorker(getWorkerCtx)
+// if err != nil && errors.Is(err, ErrWatcherStopped) {
+// return EmptyPayload, ErrWatcherStopped
+// } else if err != nil {
+// return EmptyPayload, err
+// }
+//
+// sw := w.(SyncWorker)
+//
+// // todo: implement worker destroy
+// //execCtx context.Context
+// //if p.cfg.Supervisor.ExecTTL != 0 {
+// // var cancel2 context.CancelFunc
+// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL)
+// // defer cancel2()
+// //} else {
+// // execCtx = ctx
+// //}
+//
+// rsp, err := sw.Exec(rqs)
+// if err != nil {
+// errJ := p.checkMaxJobs(ctx, w)
+// if errJ != nil {
+// // todo: worker was not destroyed
+// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
+// }
+//
+// // soft job errors are allowed
+// if _, jobError := err.(JobError); jobError {
+// p.ww.PushWorker(w)
+// return EmptyPayload, err
+// }
+//
+// sw.State().Set(StateInvalid)
+// errS := w.Stop(ctx)
+// if errS != nil {
+// return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+// }
+//
+// return EmptyPayload, err
+// }
+//
+// // worker want's to be terminated
+// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+// w.State().Set(StateInvalid)
+// err = w.Stop(ctx)
+// if err != nil {
+// return EmptyPayload, err
+// }
+// return p.ExecWithContext(ctx, rqs)
+// }
+//
+// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+// err = p.ww.AllocateNew(ctx)
+// if err != nil {
+// return EmptyPayload, err
+// }
+// } else {
+// p.muw.Lock()
+// p.ww.PushWorker(w)
+// p.muw.Unlock()
+// }
+//
+// return rsp, nil
+//}
// Destroy all underlying stack (but let them to complete the task).
func (p *StaticPool) Destroy(ctx context.Context) {
p.ww.Destroy(ctx)
}
-func (p *StaticPool) Events() chan PoolEvent {
- return p.events
-}
-
// allocate required number of stack
func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
var workers []WorkerBase
@@ -243,6 +264,7 @@ func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
err := p.ww.AllocateNew(ctx)
if err != nil {
+ p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
return err
}
}
diff --git a/static_pool_test.go b/static_pool_test.go
index ce9e6820..4a0c483a 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -2,7 +2,6 @@ package roadrunner
import (
"context"
- "fmt"
"log"
"os/exec"
"runtime"
@@ -18,7 +17,6 @@ var cfg = Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
}
func Test_NewPool(t *testing.T) {
@@ -27,12 +25,10 @@ func Test_NewPool(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
- assert.Equal(t, cfg, p.Config())
-
defer p.Destroy(ctx)
assert.NotNil(t, p)
@@ -43,7 +39,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.Nil(t, p)
@@ -55,7 +51,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -71,7 +67,7 @@ func Test_StaticPool_Echo(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -79,7 +75,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -95,7 +91,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -103,7 +99,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -119,7 +115,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -127,7 +123,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -143,31 +139,31 @@ func Test_StaticPool_JobError(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, TaskError{}, err)
+ assert.IsType(t, JobError{}, err)
assert.Equal(t, "hello", err.Error())
}
// TODO temporary commented, figure out later
-// func Test_StaticPool_Broken_Replace(t *testing.T) {
+//func Test_StaticPool_Broken_Replace(t *testing.T) {
// ctx := context.Background()
// p, err := NewPool(
// ctx,
// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
// NewPipeFactory(),
-// &cfg,
+// cfg,
// )
// assert.NoError(t, err)
// assert.NotNil(t, p)
@@ -177,6 +173,10 @@ func Test_StaticPool_JobError(t *testing.T) {
// var i int64
// atomic.StoreInt64(&i, 10)
//
+// p.AddListener(func(event interface{}) {
+//
+// })
+//
// go func() {
// for {
// select {
@@ -206,14 +206,14 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -226,17 +226,13 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
// Consume pool events
wg := sync.WaitGroup{}
wg.Add(1)
- go func() {
- for true {
- select {
- case ev := <-p.Events():
- fmt.Println(ev)
- if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct {
- wg.Done()
- }
+ p.AddListener(func(event interface{}) {
+ if pe, ok := event.(PoolEvent); ok {
+ if pe.Event == EventWorkerConstruct {
+ wg.Done()
}
}
- }()
+ })
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill(ctx)
@@ -258,11 +254,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
- ExecTTL: time.Second * 4,
},
)
assert.Error(t, err)
@@ -275,12 +270,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
assert.NoError(t, err)
@@ -291,11 +285,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, _ := p.Exec(Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -314,11 +308,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 15,
},
)
assert.NoError(t, err)
@@ -326,26 +319,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
assert.NotNil(t, p)
- go func() {
- for {
- select {
- case ev := <-p.Events():
- fmt.Println(ev)
- }
- }
- }()
-
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -364,11 +348,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
@@ -376,7 +359,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err = p.Exec(Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -387,11 +370,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
@@ -399,7 +381,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err := p.Exec(Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -407,7 +389,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 10)
p.Destroy(ctx)
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err = p.Exec(Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -418,11 +400,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
assert.NoError(t, err)
@@ -434,7 +415,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
w.State().Set(StateErrored)
}
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = p.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
}
@@ -444,11 +425,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
@@ -464,7 +444,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
if err != nil {
b.Fatal(err)
@@ -473,7 +453,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -486,11 +466,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
defer p.Destroy(ctx)
@@ -500,7 +479,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -517,12 +496,11 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
defer p.Destroy(ctx)
@@ -530,7 +508,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/sync_worker.go b/sync_worker.go
index de9491d6..cbc2cc0b 100644
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -3,6 +3,7 @@ package roadrunner
import (
"context"
"fmt"
+ "github.com/spiral/roadrunner/v2/util"
"time"
"github.com/pkg/errors"
@@ -14,19 +15,17 @@ var EmptyPayload = Payload{}
type SyncWorker interface {
// WorkerBase provides basic functionality for the SyncWorker
WorkerBase
+
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs Payload) (Payload, error)
-
- // ExecWithContext allow to set ExecTTL
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
}
-type taskWorker struct {
+type syncWorker struct {
w WorkerBase
}
func NewSyncWorker(w WorkerBase) (SyncWorker, error) {
- return &taskWorker{
+ return &syncWorker{
w: w,
}, nil
}
@@ -36,68 +35,9 @@ type twexec struct {
err error
}
-func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- c := make(chan twexec)
- go func() {
- if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
- c <- twexec{
- payload: EmptyPayload,
- err: fmt.Errorf("payload can not be empty"),
- }
- return
- }
-
- if tw.w.State().Value() != StateReady {
- c <- twexec{
- payload: EmptyPayload,
- err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()),
- }
- return
- }
-
- // set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(StateWorking)
-
- rsp, err := tw.execPayload(rqs)
- if err != nil {
- if _, ok := err.(TaskError); !ok {
- tw.w.State().Set(StateErrored)
- tw.w.State().RegisterExec()
- }
- c <- twexec{
- payload: EmptyPayload,
- err: err,
- }
- return
- }
-
- tw.w.State().Set(StateReady)
- tw.w.State().RegisterExec()
- c <- twexec{
- payload: rsp,
- err: nil,
- }
- return
- }()
-
- for {
- select {
- case <-ctx.Done():
- return EmptyPayload, ctx.Err()
- case res := <-c:
- if res.err != nil {
- return EmptyPayload, res.err
- }
-
- return res.payload, nil
- }
- }
-}
-
-//
-func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
- if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
+// Exec payload without TTL timeout.
+func (tw *syncWorker) Exec(p Payload) (Payload, error) {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
return EmptyPayload, fmt.Errorf("payload can not be empty")
}
@@ -109,9 +49,9 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
tw.w.State().Set(StateWorking)
- rsp, err := tw.execPayload(rqs)
+ rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(TaskError); !ok {
+ if _, ok := err.(JobError); !ok {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -124,7 +64,7 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
return rsp, nil
}
-func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
+func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
// two things; todo: merge
if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
return EmptyPayload, errors.Wrap(err, "header error")
@@ -147,7 +87,7 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, TaskError(rsp.Context)
+ return EmptyPayload, JobError(rsp.Context)
}
// add streaming support :)
@@ -158,46 +98,46 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
return rsp, nil
}
-func (tw *taskWorker) String() string {
+func (tw *syncWorker) String() string {
return tw.w.String()
}
-func (tw *taskWorker) Created() time.Time {
- return tw.w.Created()
+func (tw *syncWorker) Pid() int64 {
+ return tw.w.Pid()
}
-func (tw *taskWorker) Events() <-chan WorkerEvent {
- return tw.w.Events()
+func (tw *syncWorker) Created() time.Time {
+ return tw.w.Created()
}
-func (tw *taskWorker) Pid() int64 {
- return tw.w.Pid()
+func (tw *syncWorker) AddListener(listener util.EventListener) {
+ tw.w.AddListener(listener)
}
-func (tw *taskWorker) State() State {
+func (tw *syncWorker) State() State {
return tw.w.State()
}
-func (tw *taskWorker) Start() error {
+func (tw *syncWorker) Start() error {
return tw.w.Start()
}
-func (tw *taskWorker) Wait(ctx context.Context) error {
+func (tw *syncWorker) Wait(ctx context.Context) error {
return tw.w.Wait(ctx)
}
-func (tw *taskWorker) Stop(ctx context.Context) error {
+func (tw *syncWorker) Stop(ctx context.Context) error {
return tw.w.Stop(ctx)
}
-func (tw *taskWorker) Kill(ctx context.Context) error {
+func (tw *syncWorker) Kill(ctx context.Context) error {
return tw.w.Kill(ctx)
}
-func (tw *taskWorker) Relay() goridge.Relay {
+func (tw *syncWorker) Relay() goridge.Relay {
return tw.w.Relay()
}
-func (tw *taskWorker) AttachRelay(rl goridge.Relay) {
+func (tw *syncWorker) AttachRelay(rl goridge.Relay) {
tw.w.AttachRelay(rl)
}
diff --git a/sync_worker_test.go b/sync_worker_test.go
index f4868009..ad1513d7 100644
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -2,13 +2,10 @@ package roadrunner
import (
"context"
- "errors"
+ "github.com/stretchr/testify/assert"
"os/exec"
"sync"
"testing"
- "time"
-
- "github.com/stretchr/testify/assert"
)
func Test_Echo(t *testing.T) {
@@ -34,7 +31,7 @@ func Test_Echo(t *testing.T) {
}
}()
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -65,7 +62,7 @@ func Test_BadPayload(t *testing.T) {
}
}()
- res, err := syncWorker.ExecWithContext(ctx, EmptyPayload)
+ res, err := syncWorker.Exec(EmptyPayload)
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -84,7 +81,6 @@ func Test_NotStarted_String(t *testing.T) {
}
func Test_NotStarted_Exec(t *testing.T) {
- ctx := context.Background()
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
w, _ := InitBaseWorker(cmd)
@@ -94,7 +90,7 @@ func Test_NotStarted_Exec(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -143,7 +139,7 @@ func Test_Echo_Slow(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -164,28 +160,34 @@ func Test_Broken(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
- go func() {
- assert.NotNil(t, w)
- tt := time.NewTimer(time.Second * 10)
- defer wg.Done()
- for {
- select {
- case ev := <-w.Events():
- assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()")
- return
- case <-tt.C:
- assert.Error(t, errors.New("no events from worker"))
- return
- }
- }
- }()
+
+ w.AddListener(func(event interface{}) {
+ assert.Contains(t, string(event.(WorkerEvent).Payload.([]byte)), "undefined_function()")
+ wg.Done()
+ })
+
+ //go func() {
+ // assert.NotNil(t, w)
+ // tt := time.NewTimer(time.Second * 10)
+ // defer wg.Done()
+ // for {
+ // select {
+ // case ev := <-w.Events():
+ // assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()")
+ // return
+ // case <-tt.C:
+ // assert.Error(t, errors.New("no events from worker"))
+ // return
+ // }
+ // }
+ //}()
syncWorker, err := NewSyncWorker(w)
if err != nil {
t.Fatal(err)
}
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -215,12 +217,12 @@ func Test_Error(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, TaskError{}, err)
+ assert.IsType(t, JobError{}, err)
assert.Equal(t, "hello", err.Error())
}
@@ -244,19 +246,19 @@ func Test_NumExecs(t *testing.T) {
t.Fatal(err)
}
- _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/util/events.go b/util/events.go
new file mode 100644
index 00000000..9e12c4f7
--- /dev/null
+++ b/util/events.go
@@ -0,0 +1,26 @@
+package util
+
+// Event listener listens for the events produced by worker, worker pool or other servce.
+type EventListener func(event interface{})
+
+// EventHandler helps to broadcast events to multiple listeners.
+type EventHandler struct {
+ listeners []EventListener
+}
+
+// NumListeners returns number of event listeners.
+func (eb *EventHandler) NumListeners() int {
+ return len(eb.listeners)
+}
+
+// AddListener registers new event listener.
+func (eb *EventHandler) AddListener(listener EventListener) {
+ eb.listeners = append(eb.listeners, listener)
+}
+
+// Push broadcast events across all event listeners.
+func (eb *EventHandler) Push(e interface{}) {
+ for _, listener := range eb.listeners {
+ listener(e)
+ }
+}
diff --git a/worker.go b/worker.go
index c0a735c2..05b5712d 100644
--- a/worker.go
+++ b/worker.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "github.com/spiral/roadrunner/v2/util"
"os"
"os/exec"
"strconv"
@@ -15,6 +16,12 @@ import (
"go.uber.org/multierr"
)
+const (
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages
+ // before merging output together since lastError update (required to keep error update together).
+ WaitDuration = 25 * time.Millisecond
+)
+
// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
@@ -22,38 +29,31 @@ const (
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
-
- // EventWorkerWaitDone triggered when worker exit from process Wait
- EventWorkerWaitDone // todo: implemented?
-
- EventWorkerBufferClosed
-
- EventRelayCloseError
-
- EventWorkerProcessError
)
-const (
- // WaitDuration - for how long error buffer should attempt to aggregate error messages
- // before merging output together since lastError update (required to keep error update together).
- WaitDuration = 100 * time.Millisecond
-)
-
-// todo: write comment
+// WorkerEvent wraps worker events.
type WorkerEvent struct {
- Event int64
- Worker WorkerBase
+ // Event id, see below.
+ Event int64
+
+ // Worker triggered the event.
+ Worker WorkerBase
+
+ // Event specific payload.
Payload interface{}
}
type WorkerBase interface {
fmt.Stringer
- Created() time.Time
+ // Pid returns worker pid.
+ Pid() int64
- Events() <-chan WorkerEvent
+ // Created returns time worker was created at.
+ Created() time.Time
- Pid() int64
+ // AddListener attaches listener to consume worker events.
+ AddListener(listener util.EventListener)
// State return receive-only WorkerProcess state object, state can be used to safely access
// WorkerProcess status, time when status changed and number of WorkerProcess executions.
@@ -88,7 +88,7 @@ type WorkerProcess struct {
created time.Time
// updates parent supervisor or pool about WorkerProcess events
- events chan WorkerEvent
+ events *util.EventHandler
// state holds information about current WorkerProcess state,
// number of WorkerProcess executions, buf status change time.
@@ -129,7 +129,7 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
}
w := &WorkerProcess{
created: time.Now(),
- events: make(chan WorkerEvent, 10),
+ events: &util.EventHandler{},
cmd: cmd,
state: newState(StateInactive),
}
@@ -142,12 +142,23 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
return w, nil
}
+// Pid returns worker pid.
+func (w *WorkerProcess) Pid() int64 {
+ return int64(w.pid)
+}
+
+// Created returns time worker was created at.
func (w *WorkerProcess) Created() time.Time {
return w.created
}
-func (w *WorkerProcess) Pid() int64 {
- return int64(w.pid)
+// AddListener registers new worker event listener.
+func (w *WorkerProcess) AddListener(listener util.EventListener) {
+ w.events.AddListener(listener)
+
+ w.errBuffer.mu.Lock()
+ w.errBuffer.enable = true
+ w.errBuffer.mu.Unlock()
}
// State return receive-only WorkerProcess state object, state can be used to safely access
@@ -195,10 +206,6 @@ func (w *WorkerProcess) Start() error {
return nil
}
-func (w *WorkerProcess) Events() <-chan WorkerEvent {
- return w.events
-}
-
// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
// complete and will return process error (if any), if stderr is presented it's value
// will be wrapped as WorkerError. Method will return error code if php process fails
@@ -208,15 +215,8 @@ func (w *WorkerProcess) Wait(ctx context.Context) error {
w.endState = w.cmd.ProcessState
if err != nil {
w.state.Set(StateErrored)
- // if there are messages in the events channel, read it
- // TODO potentially danger place
- if len(w.events) > 0 {
- select {
- case ev := <-w.events:
- err = multierr.Append(err, errors.New(string(ev.Payload.([]byte))))
- }
- }
- // if no errors in the events, error might be in the errbuffer
+
+ // if no errors in the events, error might be in the errBuffer
if w.errBuffer.Len() > 0 {
err = multierr.Append(err, errors.New(w.errBuffer.String()))
}
@@ -250,6 +250,7 @@ func (w *WorkerProcess) closeRelay() error {
// Stop sends soft termination command to the WorkerProcess and waits for process completion.
func (w *WorkerProcess) Stop(ctx context.Context) error {
c := make(chan error)
+
go func() {
var err error
w.errBuffer.Close()
@@ -264,6 +265,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error {
w.state.Set(StateStopped)
c <- nil
}()
+
select {
case <-ctx.Done():
return ctx.Err()
@@ -290,16 +292,17 @@ func (w *WorkerProcess) Kill(ctx context.Context) error {
}
func (w *WorkerProcess) logCallback(log []byte) {
- w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log})
}
// thread safe errBuffer
type errBuffer struct {
- mu sync.RWMutex
- buf []byte
- last int
- wait *time.Timer
- // todo remove update
+ enable bool
+ mu sync.RWMutex
+ buf []byte
+ last int
+ wait *time.Timer
+ // todo: remove update
update chan interface{}
stop chan interface{}
logCallback func(log []byte)
@@ -321,7 +324,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer {
eb.wait.Reset(WaitDuration)
case <-eb.wait.C:
eb.mu.Lock()
- if len(eb.buf) > eb.last {
+ if eb.enable && len(eb.buf) > eb.last {
eb.logCallback(eb.buf[eb.last:])
eb.buf = eb.buf[0:0]
eb.last = len(eb.buf)
@@ -331,11 +334,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer {
eb.wait.Stop()
eb.mu.Lock()
- if len(eb.buf) > eb.last {
- if eb == nil || eb.logCallback == nil {
- eb.mu.Unlock()
- return
- }
+ if eb.enable && len(eb.buf) > eb.last {
eb.logCallback(eb.buf[eb.last:])
eb.last = len(eb.buf)
}
diff --git a/worker_test.go b/worker_test.go
index a90b7ef2..d2744345 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -91,6 +91,7 @@ func TestErrBuffer_Write_Event(t *testing.T) {
assert.Equal(t, []byte("hello\n"), log)
wg.Done()
}
+ buf.enable = true
_, err := buf.Write([]byte("hello\n"))
if err != nil {
@@ -116,6 +117,8 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) {
assert.Equal(t, []byte("hello\nending"), log)
wg.Done()
}
+ buf.enable = true
+
_, err := buf.Write([]byte("hel"))
if err != nil {
t.Errorf("fail to write: error %v", err)
diff --git a/workers_watcher.go b/worker_watcher.go
index d9d27196..773f7745 100644
--- a/workers_watcher.go
+++ b/worker_watcher.go
@@ -3,6 +3,7 @@ package roadrunner
import (
"context"
"errors"
+ "github.com/spiral/roadrunner/v2/util"
"sync"
"time"
)
@@ -59,36 +60,36 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
return w, false
}
-type WorkersWatcher struct {
- mutex sync.RWMutex
- stack *Stack
- allocator func(args ...interface{}) (WorkerBase, error)
- initialNumWorkers int64
- actualNumWorkers int64
- events chan PoolEvent
-}
-
type WorkerWatcher interface {
// AddToWatch used to add stack to wait its state
AddToWatch(ctx context.Context, workers []WorkerBase) error
+
// GetFreeWorker provide first free worker
GetFreeWorker(ctx context.Context) (WorkerBase, error)
+
// PutWorker enqueues worker back
PushWorker(w WorkerBase)
+
// AllocateNew used to allocate new worker and put in into the WorkerWatcher
AllocateNew(ctx context.Context) error
+
// Destroy destroys the underlying stack
Destroy(ctx context.Context)
+
// WorkersList return all stack w/o removing it from internal storage
WorkersList() []WorkerBase
+
// RemoveWorker remove worker from the stack
RemoveWorker(ctx context.Context, wb WorkerBase) error
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher {
- // todo check if events not nil
- ww := &WorkersWatcher{
+func newWorkerWatcher(
+ allocator func(args ...interface{}) (WorkerBase, error),
+ numWorkers int64,
+ events *util.EventHandler,
+) *workerWatcher {
+ ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
initialNumWorkers: numWorkers,
@@ -99,14 +100,23 @@ func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), n
return ww
}
-func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error {
+type workerWatcher struct {
+ mutex sync.RWMutex
+ stack *Stack
+ allocator func(args ...interface{}) (WorkerBase, error)
+ initialNumWorkers int64
+ actualNumWorkers int64
+ events *util.EventHandler
+}
+
+func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error {
for i := 0; i < len(workers); i++ {
sw, err := NewSyncWorker(workers[i])
if err != nil {
return err
}
ww.stack.Push(sw)
- ww.watch(sw)
+ sw.AddListener(ww.events.Push)
go func(swc WorkerBase) {
ww.wait(ctx, swc)
@@ -115,12 +125,13 @@ func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase)
return nil
}
-func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
+func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
return nil, ErrWatcherStopped
}
+
// handle worker remove state
// in this state worker is destroyed by supervisor
if w != nil && w.State().Value() == StateRemove {
@@ -131,6 +142,7 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
// try to get next
return ww.GetFreeWorker(ctx)
}
+
// no free stack
if w == nil {
tout := time.NewTicker(time.Second * 180)
@@ -152,23 +164,31 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
}
}
}
+
ww.decreaseNumOfActualWorkers()
return w, nil
}
-func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error {
+func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
ww.stack.mutex.Lock()
sw, err := ww.allocator()
if err != nil {
return err
}
+
ww.addToWatch(sw)
ww.stack.mutex.Unlock()
ww.PushWorker(sw)
+
+ ww.events.Push(PoolEvent{
+ Event: EventWorkerConstruct,
+ Payload: sw,
+ })
+
return nil
}
-func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
+func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
ww.stack.mutex.Lock()
defer ww.stack.mutex.Unlock()
pid := wb.Pid()
@@ -193,19 +213,19 @@ func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
}
// O(1) operation
-func (ww *WorkersWatcher) PushWorker(w WorkerBase) {
+func (ww *workerWatcher) PushWorker(w WorkerBase) {
ww.mutex.Lock()
ww.actualNumWorkers++
ww.mutex.Unlock()
ww.stack.Push(w)
}
-func (ww *WorkersWatcher) ReduceWorkersCount() {
+func (ww *workerWatcher) ReduceWorkersCount() {
ww.decreaseNumOfActualWorkers()
}
// Destroy all underlying stack (but let them to complete the task)
-func (ww *WorkersWatcher) Destroy(ctx context.Context) {
+func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.stack.mutex.Lock()
ww.stack.destroy = true
ww.stack.mutex.Unlock()
@@ -238,67 +258,63 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation
-func (ww *WorkersWatcher) WorkersList() []WorkerBase {
+func (ww *workerWatcher) WorkersList() []WorkerBase {
return ww.stack.workers
}
-func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) {
+func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
err := w.Wait(ctx)
if err != nil {
- ww.events <- PoolEvent{Payload: WorkerEvent{
+ ww.events.Push(WorkerEvent{
Event: EventWorkerError,
Worker: w,
Payload: err,
- }}
+ })
}
- // If not destroyed, reallocate
- if w.State().Value() != StateDestroyed {
- pid := w.Pid()
- ww.stack.mutex.Lock()
- for i := 0; i < len(ww.stack.workers); i++ {
- // worker in the stack, reallocating
- if ww.stack.workers[i].Pid() == pid {
- ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.decreaseNumOfActualWorkers()
- ww.stack.mutex.Unlock()
- err = ww.AllocateNew(ctx)
- if err != nil {
- ww.events <- PoolEvent{Payload: WorkerEvent{
- Event: EventWorkerError,
- Worker: w,
- Payload: err,
- }}
- return
- }
- ww.events <- PoolEvent{Payload: WorkerEvent{
- Event: EventWorkerConstruct,
- Worker: nil,
- Payload: nil,
- }}
- return
+
+ if w.State().Value() == StateDestroyed {
+ // worker was manually destroyed, no need to replace
+ return
+ }
+
+ pid := w.Pid()
+ ww.stack.mutex.Lock()
+ for i := 0; i < len(ww.stack.workers); i++ {
+ // worker in the stack, reallocating
+ if ww.stack.workers[i].Pid() == pid {
+
+ ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
+ ww.decreaseNumOfActualWorkers()
+ ww.stack.mutex.Unlock()
+
+ err = ww.AllocateNew(ctx)
+ if err != nil {
+ ww.events.Push(PoolEvent{
+ Event: EventPoolError,
+ Payload: err,
+ })
}
- }
- ww.stack.mutex.Unlock()
- // worker not in the stack (not returned), forget and allocate new
- err = ww.AllocateNew(ctx)
- if err != nil {
- ww.events <- PoolEvent{Payload: WorkerEvent{
- Event: EventWorkerError,
- Worker: w,
- Payload: err,
- }}
+
return
}
- ww.events <- PoolEvent{Payload: WorkerEvent{
- Event: EventWorkerConstruct,
- Worker: nil,
- Payload: nil,
- }}
}
+
+ ww.stack.mutex.Unlock()
+
+ // worker not in the stack (not returned), forget and allocate new
+ err = ww.AllocateNew(ctx)
+ if err != nil {
+ ww.events.Push(PoolEvent{
+ Event: EventPoolError,
+ Payload: err,
+ })
+ return
+ }
+
return
}
-func (ww *WorkersWatcher) addToWatch(wb WorkerBase) {
+func (ww *workerWatcher) addToWatch(wb WorkerBase) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
go func() {
@@ -306,18 +322,8 @@ func (ww *WorkersWatcher) addToWatch(wb WorkerBase) {
}()
}
-func (ww *WorkersWatcher) decreaseNumOfActualWorkers() {
+func (ww *workerWatcher) decreaseNumOfActualWorkers() {
ww.mutex.Lock()
ww.actualNumWorkers--
ww.mutex.Unlock()
}
-
-func (ww *WorkersWatcher) watch(swc WorkerBase) {
- // todo make event to stop function
- go func() {
- select {
- case ev := <-swc.Events():
- ww.events <- PoolEvent{Payload: ev}
- }
- }()
-}