diff options
-rw-r--r-- | cmd/rr/debug/debugger.go | 5 | ||||
-rw-r--r-- | error_buffer.go | 70 | ||||
-rw-r--r-- | error_buffer_test.go | 46 | ||||
-rw-r--r-- | service/http/service.go | 21 | ||||
-rw-r--r-- | static_pool.go | 5 | ||||
-rw-r--r-- | worker.go | 5 |
6 files changed, 104 insertions, 48 deletions
diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go index 6c23b057..0dca43de 100644 --- a/cmd/rr/debug/debugger.go +++ b/cmd/rr/debug/debugger.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/cmd/rr/utils" "github.com/spiral/roadrunner/service/http" + "strings" ) // Listener creates new debug listener. @@ -57,7 +58,7 @@ func (s *debugger) listener(event int, ctx interface{}) { // outputs switch event { case roadrunner.EventStderrOutput: - s.logger.Warning(string(ctx.([]byte))) + s.logger.Warning(strings.Trim(string(ctx.([]byte)), "\r\n")) } // rr server events @@ -73,6 +74,8 @@ func (s *debugger) listener(event int, ctx interface{}) { case roadrunner.EventPoolError: s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx)) } + + //s.logger.Warning(event, ctx) } func statusColor(status int) string { diff --git a/error_buffer.go b/error_buffer.go index 27f35e78..8c240e26 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -5,51 +5,67 @@ import ( "sync" ) -// EventStderrOutput - is triggered when worker sends data into stderr. The context is output data in []bytes form. +// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). const EventStderrOutput = 1900 // thread safe errBuffer type errBuffer struct { - mu sync.Mutex - buffer *bytes.Buffer - lsn func(event int, ctx interface{}) + mu sync.Mutex + buf []byte + off int + lsn func(event int, ctx interface{}) +} + +func newErrBuffer() *errBuffer { + buf := &errBuffer{buf: make([]byte, 0)} + return buf } // Listen attaches error stream even listener. -func (b *errBuffer) Listen(l func(event int, ctx interface{})) { - b.mu.Lock() - defer b.mu.Unlock() +func (eb *errBuffer) Listen(l func(event int, ctx interface{})) { + eb.mu.Lock() + defer eb.mu.Unlock() - b.lsn = l + eb.lsn = l } -// Len returns the number of bytes of the unread portion of the errBuffer; -// b.Len() == len(b.Bytes()). -func (b *errBuffer) Len() int { - b.mu.Lock() - defer b.mu.Unlock() +// Len returns the number of buf of the unread portion of the errBuffer; +// buf.Len() == len(buf.Bytes()). +func (eb *errBuffer) Len() int { + eb.mu.Lock() + defer eb.mu.Unlock() - return b.buffer.Len() + // currently active message + return len(eb.buf) - eb.off } // Write appends the contents of p to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of p; err is always nil. If the -// errBuffer becomes too large, Write will panic with ErrTooLarge. -func (b *errBuffer) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() - - if b.lsn != nil { - b.lsn(EventStderrOutput, p) +// needed. The return value n is the length of p; err is always nil. +func (eb *errBuffer) Write(p []byte) (int, error) { + eb.mu.Lock() + defer eb.mu.Unlock() + + eb.buf = append(eb.buf, p...) + for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() { + eb.lsn(EventStderrOutput, msg) } - return b.buffer.Write(p) + return len(p), nil } // Strings fetches all errBuffer data into string. -func (b *errBuffer) String() string { - b.mu.Lock() - defer b.mu.Unlock() +func (eb *errBuffer) String() string { + eb.mu.Lock() + defer eb.mu.Unlock() + + return string(eb.buf[eb.off:]) +} + +func (eb *errBuffer) fetchMsg() []byte { + if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 { + eb.off += i + 2 + return eb.buf[eb.off-i-2 : eb.off] + } - return b.buffer.String() + return nil } diff --git a/error_buffer_test.go b/error_buffer_test.go index aa6a17f2..2bf98915 100644 --- a/error_buffer_test.go +++ b/error_buffer_test.go @@ -1,31 +1,61 @@ package roadrunner import ( - "bytes" "github.com/stretchr/testify/assert" "testing" ) func TestErrBuffer_Write_Len(t *testing.T) { - buf := &errBuffer{buffer: new(bytes.Buffer)} + buf := newErrBuffer() + buf.Write([]byte("hello")) assert.Equal(t, 5, buf.Len()) - assert.Equal(t, buf.String(), "hello") + assert.Equal(t, "hello", buf.String()) } func TestErrBuffer_Write_Event(t *testing.T) { - buf := &errBuffer{buffer: new(bytes.Buffer)} + buf := newErrBuffer() tr := make(chan interface{}) buf.Listen(func(event int, ctx interface{}) { assert.Equal(t, EventStderrOutput, event) - assert.Equal(t, []byte("hello"), ctx) + assert.Equal(t, []byte("hello\n\n"), ctx) close(tr) }) - buf.Write([]byte("hello")) + buf.Write([]byte("hello\n\n")) <-tr - assert.Equal(t, 5, buf.Len()) - assert.Equal(t, buf.String(), "hello") + + // messages are read + assert.Equal(t, 0, buf.Len()) + assert.Equal(t, "", buf.String()) +} + +func TestErrBuffer_Write_Event_Separated(t *testing.T) { + buf := newErrBuffer() + + tr := make(chan interface{}) + buf.Listen(func(event int, ctx interface{}) { + assert.Equal(t, EventStderrOutput, event) + assert.Equal(t, []byte("hello\n\n"), ctx) + close(tr) + }) + + buf.Write([]byte("hel")) + buf.Write([]byte("lo\n\n")) + buf.Write([]byte("ending")) + + <-tr + assert.Equal(t, 6, buf.Len()) + assert.Equal(t, "ending", buf.String()) +} + +func TestErrBuffer_Write_Remaining(t *testing.T) { + buf := newErrBuffer() + + buf.Write([]byte("hel")) + + assert.Equal(t, 3, buf.Len()) + assert.Equal(t, "hel", buf.String()) } diff --git a/service/http/service.go b/service/http/service.go index 3d200845..d98a4596 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -7,6 +7,7 @@ import ( "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" + "sync/atomic" ) // ID contains default svc name. @@ -21,10 +22,11 @@ type Service struct { lsns []func(event int, ctx interface{}) mdws []middleware - mu sync.Mutex - rr *roadrunner.Server - srv *Handler - http *http.Server + mu sync.Mutex + rr *roadrunner.Server + inStopping int32 + srv *Handler + http *http.Server } // AddMiddleware adds new net/http middleware. @@ -95,9 +97,12 @@ func (s *Service) Serve() error { // Stop stops the svc. func (s *Service) Stop() { + atomic.AddInt32(&s.inStopping, 1) + defer atomic.AddInt32(&s.inStopping, -1) + s.mu.Lock() defer s.mu.Unlock() - if s.http == nil { + if s.http == nil || s.stopping() { return } @@ -115,12 +120,16 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.srv.ServeHTTP(w, r) } +func (s *Service) stopping() bool { + return atomic.LoadInt32(&s.inStopping) != 0 +} + func (s *Service) listener(event int, ctx interface{}) { for _, l := range s.lsns { l(event, ctx) } - if event == roadrunner.EventServerFailure { + if event == roadrunner.EventServerFailure && !s.stopping() { // attempting rr server restart if err := s.rr.Start(); err != nil { s.Stop() diff --git a/static_pool.go b/static_pool.go index ccd3f27a..cce6bd6a 100644 --- a/static_pool.go +++ b/static_pool.go @@ -28,10 +28,10 @@ type StaticPool struct { // active task executions tasks sync.WaitGroup - // workers circular allocation buffer + // workers circular allocation buf free chan *Worker - // number of workers expected to be dead in a buffer. + // number of workers expected to be dead in a buf. numDead int64 // protects state of worker list, does not affect allocation @@ -151,7 +151,6 @@ func (p *StaticPool) Destroy() { var wg sync.WaitGroup for _, w := range p.Workers() { wg.Add(1) - go w.Stop() go func(w *Worker) { defer wg.Done() p.destroyWorker(w, nil) @@ -1,7 +1,6 @@ package roadrunner import ( - "bytes" "fmt" "github.com/pkg/errors" "github.com/spiral/goridge" @@ -24,7 +23,7 @@ type Worker struct { Created time.Time // state holds information about current worker state, - // number of worker executions, last status change time. + // number of worker executions, buf status change time. // publicly this object is receive-only and protected using Mutex // and atomic counter. state *state @@ -60,7 +59,7 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) { w := &Worker{ Created: time.Now(), cmd: cmd, - err: &errBuffer{buffer: new(bytes.Buffer)}, + err: newErrBuffer(), waitDone: make(chan interface{}), state: newState(StateInactive), } |