summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/build.yml2
-rw-r--r--plugins/http/tests/configs/.rr-h2c.yaml9
-rw-r--r--plugins/http/tests/configs/.rr-ssl-push.yaml8
-rw-r--r--plugins/http/tests/configs/.rr-ssl-redirect.yaml8
-rw-r--r--plugins/http/tests/configs/.rr-ssl.yaml2
-rw-r--r--plugins/http/tests/http_test.go2
-rwxr-xr-xworker.go65
-rwxr-xr-xworker_watcher.go170
8 files changed, 120 insertions, 146 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index e24fb764..685aed7a 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -8,7 +8,7 @@ jobs:
golang:
name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
runs-on: ${{ matrix.os }}
- timeout-minutes: 30
+ timeout-minutes: 20
strategy:
fail-fast: false
matrix:
diff --git a/plugins/http/tests/configs/.rr-h2c.yaml b/plugins/http/tests/configs/.rr-h2c.yaml
index 316daea9..d1b24338 100644
--- a/plugins/http/tests/configs/.rr-h2c.yaml
+++ b/plugins/http/tests/configs/.rr-h2c.yaml
@@ -20,15 +20,6 @@ http:
maxJobs: 0
allocateTimeout: 60s
destroyTimeout: 60s
-
- ssl:
- port: 8891
- redirect: false
- cert: fixtures/server.crt
- key: fixtures/server.key
- # rootCa: root.crt
- fcgi:
- address: tcp://0.0.0.0:6920
http2:
enabled: true
h2c: true
diff --git a/plugins/http/tests/configs/.rr-ssl-push.yaml b/plugins/http/tests/configs/.rr-ssl-push.yaml
index 90a99192..02de906a 100644
--- a/plugins/http/tests/configs/.rr-ssl-push.yaml
+++ b/plugins/http/tests/configs/.rr-ssl-push.yaml
@@ -26,10 +26,4 @@ http:
redirect: true
cert: fixtures/server.crt
key: fixtures/server.key
- # rootCa: root.crt
- fcgi:
- address: tcp://0.0.0.0:6920
- http2:
- enabled: false
- h2c: false
- maxConcurrentStreams: 128 \ No newline at end of file
+ # rootCa: root.crt \ No newline at end of file
diff --git a/plugins/http/tests/configs/.rr-ssl-redirect.yaml b/plugins/http/tests/configs/.rr-ssl-redirect.yaml
index 1878ba53..0ba1753e 100644
--- a/plugins/http/tests/configs/.rr-ssl-redirect.yaml
+++ b/plugins/http/tests/configs/.rr-ssl-redirect.yaml
@@ -26,10 +26,4 @@ http:
redirect: true
cert: fixtures/server.crt
key: fixtures/server.key
- # rootCa: root.crt
- fcgi:
- address: tcp://0.0.0.0:6920
- http2:
- enabled: false
- h2c: false
- maxConcurrentStreams: 128 \ No newline at end of file
+ # rootCa: root.crt \ No newline at end of file
diff --git a/plugins/http/tests/configs/.rr-ssl.yaml b/plugins/http/tests/configs/.rr-ssl.yaml
index 127c1678..fb54d3fa 100644
--- a/plugins/http/tests/configs/.rr-ssl.yaml
+++ b/plugins/http/tests/configs/.rr-ssl.yaml
@@ -28,7 +28,7 @@ http:
key: fixtures/server.key
# rootCa: root.crt
fcgi:
- address: tcp://0.0.0.0:6920
+ address: tcp://0.0.0.0:16920
http2:
enabled: false
h2c: false
diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go
index 1902cc5e..f68cd42c 100644
--- a/plugins/http/tests/http_test.go
+++ b/plugins/http/tests/http_test.go
@@ -334,7 +334,7 @@ func sslEcho(t *testing.T) {
}
func fcgiEcho(t *testing.T) {
- fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6920")
+ fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:16920")
fcgiHandler := gofast.NewHandler(
gofast.BasicParamsMap(gofast.BasicSession),
diff --git a/worker.go b/worker.go
index f17f2c07..e639c122 100755
--- a/worker.go
+++ b/worker.go
@@ -58,6 +58,13 @@ type WorkerEvent struct {
Payload interface{}
}
+var pool = sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, 10240)
+ return buf
+ },
+}
+
type WorkerBase interface {
fmt.Stringer
@@ -243,6 +250,8 @@ func (w *WorkerProcess) Wait() error {
// if process return code > 0, here will be an error from stderr (if presents)
if w.stderr.Len() > 0 {
err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
+ // stop the stderr buffer
+ w.stop <- struct{}{}
}
w.mu.RUnlock()
@@ -311,47 +320,43 @@ func (w *WorkerProcess) Kill() error {
return nil
}
+func (w *WorkerProcess) put(data []byte) {
+ data = make([]byte, 10240)
+ pool.Put(data)
+}
+
+func (w *WorkerProcess) get() []byte {
+ return pool.Get().([]byte)
+}
+
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
// needed. The return value n is the length of pool; errBuffer is always nil.
func (w *WorkerProcess) watch() {
- proxy := make(chan [10240]byte, 5)
-
go func() {
for {
select {
case <-w.stop:
+ buf := w.get()
// read the last data
- var buf [10240]byte
- _, err := w.rd.Read(buf[:])
- if err != nil {
- panic(err)
- }
- proxy <- buf
- // and close
- close(proxy)
+ n, _ := w.rd.Read(buf[:])
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: buf[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write(buf[:n])
+ w.mu.Unlock()
+ w.put(buf)
return
default:
- var buf [10240]byte
- _, err := w.rd.Read(buf[:])
- if err != nil {
- panic(err)
- }
- proxy <- buf
+ // read the max 10kb of stderr per one read
+ buf := w.get()
+ n, _ := w.rd.Read(buf[:])
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: buf[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write(buf[:n])
+ w.mu.Unlock()
+ w.put(buf)
}
}
}()
-
- for {
- select {
- case payload, ok := <-proxy:
- if !ok {
- return
- }
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: payload[:]})
- w.mu.Lock()
- // write new message
- w.stderr.Write(payload[:])
- w.mu.Unlock()
- }
- }
}
diff --git a/worker_watcher.go b/worker_watcher.go
index 8bc147d0..3afb91ca 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -11,27 +11,31 @@ import (
)
type Stack struct {
- workers []WorkerBase
- mutex sync.RWMutex
- destroy bool
+ workers []WorkerBase
+ mutex sync.RWMutex
+ destroy bool
+ actualNumOfWorkers int64
}
func NewWorkersStack() *Stack {
+ w := runtime.NumCPU()
return &Stack{
- workers: make([]WorkerBase, 0, runtime.NumCPU()),
+ workers: make([]WorkerBase, 0, w),
+ actualNumOfWorkers: 0,
}
}
func (stack *Stack) Reset() {
stack.mutex.Lock()
defer stack.mutex.Unlock()
-
+ stack.actualNumOfWorkers = 0
stack.workers = nil
}
func (stack *Stack) Push(w WorkerBase) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
+ stack.actualNumOfWorkers++
stack.workers = append(stack.workers, w)
}
@@ -56,10 +60,60 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
w := stack.workers[len(stack.workers)-1]
stack.workers = stack.workers[:len(stack.workers)-1]
-
+ stack.actualNumOfWorkers--
return w, false
}
+func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ for i := 0; i < len(stack.workers); i++ {
+ // worker in the stack, reallocating
+ if stack.workers[i].Pid() == pid {
+ stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
+ stack.actualNumOfWorkers--
+ // worker found and removed
+ return true
+ }
+ }
+ // no worker with such ID
+ return false
+}
+
+// we also have to give a chance to pool to Push worker (return it)
+func (stack *Stack) Destroy(ctx context.Context) {
+ stack.mutex.Lock()
+ stack.destroy = true
+ stack.mutex.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 100)
+ for {
+ select {
+ case <-tt.C:
+ stack.mutex.Lock()
+ // that might be one of the workers is working
+ if len(stack.workers) != int(stack.actualNumOfWorkers) {
+ stack.mutex.Unlock()
+ continue
+ }
+ stack.mutex.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All stack at this moment are in the stack
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ stack.mutex.Lock()
+ for i := 0; i < len(stack.workers); i++ {
+ // set state for the stack in the stack (unused at the moment)
+ stack.workers[i].State().Set(StateDestroyed)
+ }
+ stack.mutex.Unlock()
+ tt.Stop()
+ // clear
+ stack.Reset()
+ return
+ }
+ }
+}
+
type WorkerWatcher interface {
// AddToWatch used to add stack to wait its state
AddToWatch(workers []WorkerBase) error
@@ -151,7 +205,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
if w == nil {
continue
}
- ww.ReduceWorkersCount()
+ //ww.ReduceWorkersCount()
return w, nil
case <-ctx.Done():
return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
@@ -159,7 +213,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
}
}
- ww.ReduceWorkersCount()
+ //ww.ReduceWorkersCount()
return w, nil
}
@@ -184,78 +238,38 @@ func (ww *workerWatcher) AllocateNew() error {
}
func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error {
- ww.stack.mutex.Lock()
+ ww.mutex.Lock()
+ defer ww.mutex.Unlock()
+
const op = errors.Op("remove worker")
- defer ww.stack.mutex.Unlock()
pid := wb.Pid()
- for i := 0; i < len(ww.stack.workers); i++ {
- if ww.stack.workers[i].Pid() == pid {
- // found in the stack
- // remove worker
- ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.ReduceWorkersCount()
-
- wb.State().Set(StateInvalid)
- err := wb.Kill()
- if err != nil {
- return errors.E(op, err)
- }
- break
+
+ if ww.stack.FindAndRemoveByPid(pid) {
+ wb.State().Set(StateInvalid)
+ err := wb.Kill()
+ if err != nil {
+ return errors.E(op, err)
}
+ return nil
}
- // worker currently handle request, set state Remove
+
wb.State().Set(StateRemove)
return nil
+
}
// O(1) operation
func (ww *workerWatcher) PushWorker(w WorkerBase) {
- ww.IncreaseWorkersCount()
- ww.stack.Push(w)
-}
-
-func (ww *workerWatcher) ReduceWorkersCount() {
- ww.mutex.Lock()
- ww.actualNumWorkers--
- ww.mutex.Unlock()
-}
-func (ww *workerWatcher) IncreaseWorkersCount() {
+ //ww.IncreaseWorkersCount()
ww.mutex.Lock()
- ww.actualNumWorkers++
- ww.mutex.Unlock()
+ defer ww.mutex.Unlock()
+ ww.stack.Push(w)
}
// Destroy all underlying stack (but let them to complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
- ww.stack.mutex.Lock()
- ww.stack.destroy = true
- ww.stack.mutex.Unlock()
-
- tt := time.NewTicker(time.Millisecond * 100)
- for {
- select {
- case <-tt.C:
- ww.stack.mutex.Lock()
- if len(ww.stack.workers) != int(ww.actualNumWorkers) {
- ww.stack.mutex.Unlock()
- continue
- }
- ww.stack.mutex.Unlock()
- // unnecessary mutex, but
- // just to make sure. All stack at this moment are in the stack
- // Pop operation is blocked, push can't be done, since it's not possible to pop
- ww.stack.mutex.Lock()
- for i := 0; i < len(ww.stack.workers); i++ {
- // set state for the stack in the stack (unused at the moment)
- ww.stack.workers[i].State().Set(StateDestroyed)
- }
- ww.stack.mutex.Unlock()
- tt.Stop()
- // clear
- ww.stack.Reset()
- return
- }
- }
+ // destroy stack
+ ww.stack.Destroy(ctx)
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
@@ -287,37 +301,13 @@ func (ww *workerWatcher) wait(w WorkerBase) {
return
}
- pid := w.Pid()
- ww.stack.mutex.Lock()
- for i := 0; i < len(ww.stack.workers); i++ {
- // worker in the stack, reallocating
- if ww.stack.workers[i].Pid() == pid {
- ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.ReduceWorkersCount()
- ww.stack.mutex.Unlock()
-
- err = ww.AllocateNew()
- if err != nil {
- ww.events.Push(PoolEvent{
- Event: EventPoolError,
- Payload: errors.E(op, err),
- })
- }
-
- return
- }
- }
-
- ww.stack.mutex.Unlock()
-
- // worker not in the stack (not returned), forget and allocate new
+ _ = ww.stack.FindAndRemoveByPid(w.Pid())
err = ww.AllocateNew()
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
Payload: errors.E(op, err),
})
- return
}
}