diff options
author | Valery Piashchynski <[email protected]> | 2020-12-15 00:45:56 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-15 00:45:56 +0300 |
commit | 3cf93d96d24201758fb5c23529af90050c3914cb (patch) | |
tree | 73241d2205034adfe65b119864f9041599a50498 | |
parent | 9cc8aef1f64f21d7238a3234df98657289a75db5 (diff) |
Rotate ports
Remaster worker_watcher
-rw-r--r-- | .github/workflows/build.yml | 2 | ||||
-rw-r--r-- | plugins/http/tests/configs/.rr-h2c.yaml | 9 | ||||
-rw-r--r-- | plugins/http/tests/configs/.rr-ssl-push.yaml | 8 | ||||
-rw-r--r-- | plugins/http/tests/configs/.rr-ssl-redirect.yaml | 8 | ||||
-rw-r--r-- | plugins/http/tests/configs/.rr-ssl.yaml | 2 | ||||
-rw-r--r-- | plugins/http/tests/http_test.go | 2 | ||||
-rwxr-xr-x | worker.go | 65 | ||||
-rwxr-xr-x | worker_watcher.go | 170 |
8 files changed, 120 insertions, 146 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e24fb764..685aed7a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -8,7 +8,7 @@ jobs: golang: name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}}) runs-on: ${{ matrix.os }} - timeout-minutes: 30 + timeout-minutes: 20 strategy: fail-fast: false matrix: diff --git a/plugins/http/tests/configs/.rr-h2c.yaml b/plugins/http/tests/configs/.rr-h2c.yaml index 316daea9..d1b24338 100644 --- a/plugins/http/tests/configs/.rr-h2c.yaml +++ b/plugins/http/tests/configs/.rr-h2c.yaml @@ -20,15 +20,6 @@ http: maxJobs: 0 allocateTimeout: 60s destroyTimeout: 60s - - ssl: - port: 8891 - redirect: false - cert: fixtures/server.crt - key: fixtures/server.key - # rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:6920 http2: enabled: true h2c: true diff --git a/plugins/http/tests/configs/.rr-ssl-push.yaml b/plugins/http/tests/configs/.rr-ssl-push.yaml index 90a99192..02de906a 100644 --- a/plugins/http/tests/configs/.rr-ssl-push.yaml +++ b/plugins/http/tests/configs/.rr-ssl-push.yaml @@ -26,10 +26,4 @@ http: redirect: true cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:6920 - http2: - enabled: false - h2c: false - maxConcurrentStreams: 128
\ No newline at end of file + # rootCa: root.crt
\ No newline at end of file diff --git a/plugins/http/tests/configs/.rr-ssl-redirect.yaml b/plugins/http/tests/configs/.rr-ssl-redirect.yaml index 1878ba53..0ba1753e 100644 --- a/plugins/http/tests/configs/.rr-ssl-redirect.yaml +++ b/plugins/http/tests/configs/.rr-ssl-redirect.yaml @@ -26,10 +26,4 @@ http: redirect: true cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt - fcgi: - address: tcp://0.0.0.0:6920 - http2: - enabled: false - h2c: false - maxConcurrentStreams: 128
\ No newline at end of file + # rootCa: root.crt
\ No newline at end of file diff --git a/plugins/http/tests/configs/.rr-ssl.yaml b/plugins/http/tests/configs/.rr-ssl.yaml index 127c1678..fb54d3fa 100644 --- a/plugins/http/tests/configs/.rr-ssl.yaml +++ b/plugins/http/tests/configs/.rr-ssl.yaml @@ -28,7 +28,7 @@ http: key: fixtures/server.key # rootCa: root.crt fcgi: - address: tcp://0.0.0.0:6920 + address: tcp://0.0.0.0:16920 http2: enabled: false h2c: false diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index 1902cc5e..f68cd42c 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -334,7 +334,7 @@ func sslEcho(t *testing.T) { } func fcgiEcho(t *testing.T) { - fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6920") + fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:16920") fcgiHandler := gofast.NewHandler( gofast.BasicParamsMap(gofast.BasicSession), @@ -58,6 +58,13 @@ type WorkerEvent struct { Payload interface{} } +var pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 10240) + return buf + }, +} + type WorkerBase interface { fmt.Stringer @@ -243,6 +250,8 @@ func (w *WorkerProcess) Wait() error { // if process return code > 0, here will be an error from stderr (if presents) if w.stderr.Len() > 0 { err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + // stop the stderr buffer + w.stop <- struct{}{} } w.mu.RUnlock() @@ -311,47 +320,43 @@ func (w *WorkerProcess) Kill() error { return nil } +func (w *WorkerProcess) put(data []byte) { + data = make([]byte, 10240) + pool.Put(data) +} + +func (w *WorkerProcess) get() []byte { + return pool.Get().([]byte) +} + // Write appends the contents of pool to the errBuffer, growing the errBuffer as // needed. The return value n is the length of pool; errBuffer is always nil. func (w *WorkerProcess) watch() { - proxy := make(chan [10240]byte, 5) - go func() { for { select { case <-w.stop: + buf := w.get() // read the last data - var buf [10240]byte - _, err := w.rd.Read(buf[:]) - if err != nil { - panic(err) - } - proxy <- buf - // and close - close(proxy) + n, _ := w.rd.Read(buf[:]) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: buf[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write(buf[:n]) + w.mu.Unlock() + w.put(buf) return default: - var buf [10240]byte - _, err := w.rd.Read(buf[:]) - if err != nil { - panic(err) - } - proxy <- buf + // read the max 10kb of stderr per one read + buf := w.get() + n, _ := w.rd.Read(buf[:]) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: buf[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write(buf[:n]) + w.mu.Unlock() + w.put(buf) } } }() - - for { - select { - case payload, ok := <-proxy: - if !ok { - return - } - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: payload[:]}) - w.mu.Lock() - // write new message - w.stderr.Write(payload[:]) - w.mu.Unlock() - } - } } diff --git a/worker_watcher.go b/worker_watcher.go index 8bc147d0..3afb91ca 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -11,27 +11,31 @@ import ( ) type Stack struct { - workers []WorkerBase - mutex sync.RWMutex - destroy bool + workers []WorkerBase + mutex sync.RWMutex + destroy bool + actualNumOfWorkers int64 } func NewWorkersStack() *Stack { + w := runtime.NumCPU() return &Stack{ - workers: make([]WorkerBase, 0, runtime.NumCPU()), + workers: make([]WorkerBase, 0, w), + actualNumOfWorkers: 0, } } func (stack *Stack) Reset() { stack.mutex.Lock() defer stack.mutex.Unlock() - + stack.actualNumOfWorkers = 0 stack.workers = nil } func (stack *Stack) Push(w WorkerBase) { stack.mutex.Lock() defer stack.mutex.Unlock() + stack.actualNumOfWorkers++ stack.workers = append(stack.workers, w) } @@ -56,10 +60,60 @@ func (stack *Stack) Pop() (WorkerBase, bool) { w := stack.workers[len(stack.workers)-1] stack.workers = stack.workers[:len(stack.workers)-1] - + stack.actualNumOfWorkers-- return w, false } +func (stack *Stack) FindAndRemoveByPid(pid int64) bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + for i := 0; i < len(stack.workers); i++ { + // worker in the stack, reallocating + if stack.workers[i].Pid() == pid { + stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) + stack.actualNumOfWorkers-- + // worker found and removed + return true + } + } + // no worker with such ID + return false +} + +// we also have to give a chance to pool to Push worker (return it) +func (stack *Stack) Destroy(ctx context.Context) { + stack.mutex.Lock() + stack.destroy = true + stack.mutex.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-tt.C: + stack.mutex.Lock() + // that might be one of the workers is working + if len(stack.workers) != int(stack.actualNumOfWorkers) { + stack.mutex.Unlock() + continue + } + stack.mutex.Unlock() + // unnecessary mutex, but + // just to make sure. All stack at this moment are in the stack + // Pop operation is blocked, push can't be done, since it's not possible to pop + stack.mutex.Lock() + for i := 0; i < len(stack.workers); i++ { + // set state for the stack in the stack (unused at the moment) + stack.workers[i].State().Set(StateDestroyed) + } + stack.mutex.Unlock() + tt.Stop() + // clear + stack.Reset() + return + } + } +} + type WorkerWatcher interface { // AddToWatch used to add stack to wait its state AddToWatch(workers []WorkerBase) error @@ -151,7 +205,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) if w == nil { continue } - ww.ReduceWorkersCount() + //ww.ReduceWorkersCount() return w, nil case <-ctx.Done(): return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) @@ -159,7 +213,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) } } - ww.ReduceWorkersCount() + //ww.ReduceWorkersCount() return w, nil } @@ -184,78 +238,38 @@ func (ww *workerWatcher) AllocateNew() error { } func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error { - ww.stack.mutex.Lock() + ww.mutex.Lock() + defer ww.mutex.Unlock() + const op = errors.Op("remove worker") - defer ww.stack.mutex.Unlock() pid := wb.Pid() - for i := 0; i < len(ww.stack.workers); i++ { - if ww.stack.workers[i].Pid() == pid { - // found in the stack - // remove worker - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.ReduceWorkersCount() - - wb.State().Set(StateInvalid) - err := wb.Kill() - if err != nil { - return errors.E(op, err) - } - break + + if ww.stack.FindAndRemoveByPid(pid) { + wb.State().Set(StateInvalid) + err := wb.Kill() + if err != nil { + return errors.E(op, err) } + return nil } - // worker currently handle request, set state Remove + wb.State().Set(StateRemove) return nil + } // O(1) operation func (ww *workerWatcher) PushWorker(w WorkerBase) { - ww.IncreaseWorkersCount() - ww.stack.Push(w) -} - -func (ww *workerWatcher) ReduceWorkersCount() { - ww.mutex.Lock() - ww.actualNumWorkers-- - ww.mutex.Unlock() -} -func (ww *workerWatcher) IncreaseWorkersCount() { + //ww.IncreaseWorkersCount() ww.mutex.Lock() - ww.actualNumWorkers++ - ww.mutex.Unlock() + defer ww.mutex.Unlock() + ww.stack.Push(w) } // Destroy all underlying stack (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - ww.stack.mutex.Lock() - ww.stack.destroy = true - ww.stack.mutex.Unlock() - - tt := time.NewTicker(time.Millisecond * 100) - for { - select { - case <-tt.C: - ww.stack.mutex.Lock() - if len(ww.stack.workers) != int(ww.actualNumWorkers) { - ww.stack.mutex.Unlock() - continue - } - ww.stack.mutex.Unlock() - // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack - // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) - ww.stack.workers[i].State().Set(StateDestroyed) - } - ww.stack.mutex.Unlock() - tt.Stop() - // clear - ww.stack.Reset() - return - } - } + // destroy stack + ww.stack.Destroy(ctx) } // Warning, this is O(n) operation, and it will return copy of the actual workers @@ -287,37 +301,13 @@ func (ww *workerWatcher) wait(w WorkerBase) { return } - pid := w.Pid() - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // worker in the stack, reallocating - if ww.stack.workers[i].Pid() == pid { - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.ReduceWorkersCount() - ww.stack.mutex.Unlock() - - err = ww.AllocateNew() - if err != nil { - ww.events.Push(PoolEvent{ - Event: EventPoolError, - Payload: errors.E(op, err), - }) - } - - return - } - } - - ww.stack.mutex.Unlock() - - // worker not in the stack (not returned), forget and allocate new + _ = ww.stack.FindAndRemoveByPid(w.Pid()) err = ww.AllocateNew() if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, Payload: errors.E(op, err), }) - return } } |