diff options
-rw-r--r-- | service/http/config_test.go | 51 | ||||
-rw-r--r-- | src/qbuild/.build.json (renamed from qbuild/.build.json) | 0 | ||||
-rw-r--r-- | src/qbuild/docker/Dockerfile (renamed from qbuild/docker/Dockerfile) | 0 | ||||
-rw-r--r-- | src/qbuild/docker/compile.sh (renamed from qbuild/docker/compile.sh) | 0 | ||||
-rw-r--r-- | src/qbuild/main.go (renamed from qbuild/main.go) | 0 | ||||
-rw-r--r-- | src/qbuild/rr-build (renamed from qbuild/rr-build) | 0 | ||||
-rw-r--r-- | src/qbuild/src/Builder.php (renamed from qbuild/src/Builder.php) | 0 | ||||
-rw-r--r-- | state.go | 3 | ||||
-rw-r--r-- | static_pool.go | 30 | ||||
-rw-r--r-- | watcher.go | 8 | ||||
-rw-r--r-- | worker.go | 4 |
11 files changed, 43 insertions, 53 deletions
diff --git a/service/http/config_test.go b/service/http/config_test.go index 7c9d629a..4cd2783f 100644 --- a/service/http/config_test.go +++ b/service/http/config_test.go @@ -222,54 +222,3 @@ func Test_Config_InvalidAddress(t *testing.T) { assert.Error(t, cfg.Valid()) } - -func Test_Config_MergeBackwardCompatibility_MaxRequestSize(t *testing.T) { - c := newTestBackwardCompatibilityConfig() - cfg := mockCfg{cfg: `{"maxRequestSize":512}`} - assert.NoError(t, c.Valid()) - assert.NoError(t, c.Hydrate(&cfg)) - assert.Equal(t, int64(512), int64(c.MaxRequestSize)) - - c = newTestBackwardCompatibilityConfig() - cfg = mockCfg{cfg: `{"maxRequestSize":256,"maxRequest":1024}`} - assert.NoError(t, c.Valid()) - assert.NoError(t, c.Hydrate(&cfg)) - assert.Equal(t, int64(256), int64(c.MaxRequestSize)) - - c = newTestBackwardCompatibilityConfig() - cfg = mockCfg{cfg: `{"maxRequestSize":256,"maxRequest":0}`} - assert.NoError(t, c.Valid()) - assert.NoError(t, c.Hydrate(&cfg)) - assert.Equal(t, int64(256), int64(c.MaxRequestSize)) - - c = newTestBackwardCompatibilityConfig() - cfg = mockCfg{cfg: `{"maxRequest":1024}`} - assert.NoError(t, c.Valid()) - assert.NoError(t, c.Hydrate(&cfg)) - assert.Equal(t, int64(1024), int64(c.MaxRequestSize)) - - c = newTestBackwardCompatibilityConfig() - cfg = mockCfg{cfg: `{"maxRequestSize":0,"maxRequest":1024}`} - assert.NoError(t, c.Valid()) - assert.NoError(t, c.Hydrate(&cfg)) - assert.Equal(t, int64(1024), int64(c.MaxRequestSize)) -} - -func newTestBackwardCompatibilityConfig() *Config { - return &Config{ - Address: ":8080", - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - Workers: &roadrunner.ServerConfig{ - Command: "php tests/client.php echo pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - }, - } -} diff --git a/qbuild/.build.json b/src/qbuild/.build.json index 74b83cea..74b83cea 100644 --- a/qbuild/.build.json +++ b/src/qbuild/.build.json diff --git a/qbuild/docker/Dockerfile b/src/qbuild/docker/Dockerfile index 11fb5abb..11fb5abb 100644 --- a/qbuild/docker/Dockerfile +++ b/src/qbuild/docker/Dockerfile diff --git a/qbuild/docker/compile.sh b/src/qbuild/docker/compile.sh index 0c85124f..0c85124f 100644 --- a/qbuild/docker/compile.sh +++ b/src/qbuild/docker/compile.sh diff --git a/qbuild/main.go b/src/qbuild/main.go index a065fe27..a065fe27 100644 --- a/qbuild/main.go +++ b/src/qbuild/main.go diff --git a/qbuild/rr-build b/src/qbuild/rr-build index 7918f4f8..7918f4f8 100644 --- a/qbuild/rr-build +++ b/src/qbuild/rr-build diff --git a/qbuild/src/Builder.php b/src/qbuild/src/Builder.php index 4b441094..4b441094 100644 --- a/qbuild/src/Builder.php +++ b/src/qbuild/src/Builder.php @@ -26,6 +26,9 @@ const ( // StateWorking - working on given payload. StateWorking + // StateDestroying - worker has been marked as being destroyed. + StateDestroying + // StateStreaming - indicates that worker is streaming the data at the moment. StateStreaming diff --git a/static_pool.go b/static_pool.go index c9473699..336ae520 100644 --- a/static_pool.go +++ b/static_pool.go @@ -46,8 +46,9 @@ type StaticPool struct { destroy chan interface{} // lsn is optional callback to handle worker create/destruct/error events. - mul sync.Mutex - lsn func(event int, ctx interface{}) + mul sync.Mutex + watcher Watcher + lsn func(event int, ctx interface{}) } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -80,6 +81,14 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er return p, nil } +// WatchWorkers enables worker watching. +func (p *StaticPool) WatchWorkers(w Watcher) { + p.mul.Lock() + defer p.mul.Unlock() + + p.watcher = w +} + // Listen attaches pool event watcher. func (p *StaticPool) Listen(l func(event int, ctx interface{})) { p.mul.Lock() @@ -181,6 +190,14 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } + if p.watcher != nil { + if keep, err := p.watcher.Keep(p, w); !keep { + i++ + w.markDestroying() + go p.destroyWorker(w, err) + } + } + return w, nil case <-p.destroy: return nil, fmt.Errorf("pool has been stopped") @@ -199,6 +216,15 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { atomic.AddInt64(&p.numDead, ^int64(0)) continue } + + if p.watcher != nil { + if keep, err := p.watcher.Keep(p, w); !keep { + i++ + w.markDestroying() + go p.destroyWorker(w, err) + } + } + return w, nil case <-p.destroy: return nil, fmt.Errorf("pool has been stopped") diff --git a/watcher.go b/watcher.go new file mode 100644 index 00000000..bbc7b9dc --- /dev/null +++ b/watcher.go @@ -0,0 +1,8 @@ +package roadrunner + +// Watcher watches for workers. +type Watcher interface { + // Keep must return true and nil if worker is OK to continue working, + // must return false and optional error to force worker destruction. + Keep(p Pool, w *Worker) (keep bool, err error) +} @@ -192,6 +192,10 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { return rsp, err } +func (w *Worker) markDestroying() { + w.state.set(StateDestroying) +} + func (w *Worker) start() error { if err := w.cmd.Start(); err != nil { close(w.waitDone) |