summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--service/http/config_test.go51
-rw-r--r--src/qbuild/.build.json (renamed from qbuild/.build.json)0
-rw-r--r--src/qbuild/docker/Dockerfile (renamed from qbuild/docker/Dockerfile)0
-rw-r--r--src/qbuild/docker/compile.sh (renamed from qbuild/docker/compile.sh)0
-rw-r--r--src/qbuild/main.go (renamed from qbuild/main.go)0
-rw-r--r--src/qbuild/rr-build (renamed from qbuild/rr-build)0
-rw-r--r--src/qbuild/src/Builder.php (renamed from qbuild/src/Builder.php)0
-rw-r--r--state.go3
-rw-r--r--static_pool.go30
-rw-r--r--watcher.go8
-rw-r--r--worker.go4
11 files changed, 43 insertions, 53 deletions
diff --git a/service/http/config_test.go b/service/http/config_test.go
index 7c9d629a..4cd2783f 100644
--- a/service/http/config_test.go
+++ b/service/http/config_test.go
@@ -222,54 +222,3 @@ func Test_Config_InvalidAddress(t *testing.T) {
assert.Error(t, cfg.Valid())
}
-
-func Test_Config_MergeBackwardCompatibility_MaxRequestSize(t *testing.T) {
- c := newTestBackwardCompatibilityConfig()
- cfg := mockCfg{cfg: `{"maxRequestSize":512}`}
- assert.NoError(t, c.Valid())
- assert.NoError(t, c.Hydrate(&cfg))
- assert.Equal(t, int64(512), int64(c.MaxRequestSize))
-
- c = newTestBackwardCompatibilityConfig()
- cfg = mockCfg{cfg: `{"maxRequestSize":256,"maxRequest":1024}`}
- assert.NoError(t, c.Valid())
- assert.NoError(t, c.Hydrate(&cfg))
- assert.Equal(t, int64(256), int64(c.MaxRequestSize))
-
- c = newTestBackwardCompatibilityConfig()
- cfg = mockCfg{cfg: `{"maxRequestSize":256,"maxRequest":0}`}
- assert.NoError(t, c.Valid())
- assert.NoError(t, c.Hydrate(&cfg))
- assert.Equal(t, int64(256), int64(c.MaxRequestSize))
-
- c = newTestBackwardCompatibilityConfig()
- cfg = mockCfg{cfg: `{"maxRequest":1024}`}
- assert.NoError(t, c.Valid())
- assert.NoError(t, c.Hydrate(&cfg))
- assert.Equal(t, int64(1024), int64(c.MaxRequestSize))
-
- c = newTestBackwardCompatibilityConfig()
- cfg = mockCfg{cfg: `{"maxRequestSize":0,"maxRequest":1024}`}
- assert.NoError(t, c.Valid())
- assert.NoError(t, c.Hydrate(&cfg))
- assert.Equal(t, int64(1024), int64(c.MaxRequestSize))
-}
-
-func newTestBackwardCompatibilityConfig() *Config {
- return &Config{
- Address: ":8080",
- Uploads: &UploadsConfig{
- Dir: os.TempDir(),
- Forbid: []string{".go"},
- },
- Workers: &roadrunner.ServerConfig{
- Command: "php tests/client.php echo pipes",
- Relay: "pipes",
- Pool: &roadrunner.Config{
- NumWorkers: 1,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- },
- }
-}
diff --git a/qbuild/.build.json b/src/qbuild/.build.json
index 74b83cea..74b83cea 100644
--- a/qbuild/.build.json
+++ b/src/qbuild/.build.json
diff --git a/qbuild/docker/Dockerfile b/src/qbuild/docker/Dockerfile
index 11fb5abb..11fb5abb 100644
--- a/qbuild/docker/Dockerfile
+++ b/src/qbuild/docker/Dockerfile
diff --git a/qbuild/docker/compile.sh b/src/qbuild/docker/compile.sh
index 0c85124f..0c85124f 100644
--- a/qbuild/docker/compile.sh
+++ b/src/qbuild/docker/compile.sh
diff --git a/qbuild/main.go b/src/qbuild/main.go
index a065fe27..a065fe27 100644
--- a/qbuild/main.go
+++ b/src/qbuild/main.go
diff --git a/qbuild/rr-build b/src/qbuild/rr-build
index 7918f4f8..7918f4f8 100644
--- a/qbuild/rr-build
+++ b/src/qbuild/rr-build
diff --git a/qbuild/src/Builder.php b/src/qbuild/src/Builder.php
index 4b441094..4b441094 100644
--- a/qbuild/src/Builder.php
+++ b/src/qbuild/src/Builder.php
diff --git a/state.go b/state.go
index 4d8b1eaa..d3ac2a2f 100644
--- a/state.go
+++ b/state.go
@@ -26,6 +26,9 @@ const (
// StateWorking - working on given payload.
StateWorking
+ // StateDestroying - worker has been marked as being destroyed.
+ StateDestroying
+
// StateStreaming - indicates that worker is streaming the data at the moment.
StateStreaming
diff --git a/static_pool.go b/static_pool.go
index c9473699..336ae520 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -46,8 +46,9 @@ type StaticPool struct {
destroy chan interface{}
// lsn is optional callback to handle worker create/destruct/error events.
- mul sync.Mutex
- lsn func(event int, ctx interface{})
+ mul sync.Mutex
+ watcher Watcher
+ lsn func(event int, ctx interface{})
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -80,6 +81,14 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
+// WatchWorkers enables worker watching.
+func (p *StaticPool) WatchWorkers(w Watcher) {
+ p.mul.Lock()
+ defer p.mul.Unlock()
+
+ p.watcher = w
+}
+
// Listen attaches pool event watcher.
func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
p.mul.Lock()
@@ -181,6 +190,14 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
+ if p.watcher != nil {
+ if keep, err := p.watcher.Keep(p, w); !keep {
+ i++
+ w.markDestroying()
+ go p.destroyWorker(w, err)
+ }
+ }
+
return w, nil
case <-p.destroy:
return nil, fmt.Errorf("pool has been stopped")
@@ -199,6 +216,15 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
atomic.AddInt64(&p.numDead, ^int64(0))
continue
}
+
+ if p.watcher != nil {
+ if keep, err := p.watcher.Keep(p, w); !keep {
+ i++
+ w.markDestroying()
+ go p.destroyWorker(w, err)
+ }
+ }
+
return w, nil
case <-p.destroy:
return nil, fmt.Errorf("pool has been stopped")
diff --git a/watcher.go b/watcher.go
new file mode 100644
index 00000000..bbc7b9dc
--- /dev/null
+++ b/watcher.go
@@ -0,0 +1,8 @@
+package roadrunner
+
+// Watcher watches for workers.
+type Watcher interface {
+ // Keep must return true and nil if worker is OK to continue working,
+ // must return false and optional error to force worker destruction.
+ Keep(p Pool, w *Worker) (keep bool, err error)
+}
diff --git a/worker.go b/worker.go
index c52960b2..04b58e49 100644
--- a/worker.go
+++ b/worker.go
@@ -192,6 +192,10 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
return rsp, err
}
+func (w *Worker) markDestroying() {
+ w.state.set(StateDestroying)
+}
+
func (w *Worker) start() error {
if err := w.cmd.Start(); err != nil {
close(w.waitDone)