summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/debug/debugger.go5
-rw-r--r--error_buffer.go70
-rw-r--r--error_buffer_test.go46
-rw-r--r--service/http/service.go21
-rw-r--r--static_pool.go5
-rw-r--r--worker.go5
6 files changed, 104 insertions, 48 deletions
diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go
index 6c23b057..0dca43de 100644
--- a/cmd/rr/debug/debugger.go
+++ b/cmd/rr/debug/debugger.go
@@ -5,6 +5,7 @@ import (
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/cmd/rr/utils"
"github.com/spiral/roadrunner/service/http"
+ "strings"
)
// Listener creates new debug listener.
@@ -57,7 +58,7 @@ func (s *debugger) listener(event int, ctx interface{}) {
// outputs
switch event {
case roadrunner.EventStderrOutput:
- s.logger.Warning(string(ctx.([]byte)))
+ s.logger.Warning(strings.Trim(string(ctx.([]byte)), "\r\n"))
}
// rr server events
@@ -73,6 +74,8 @@ func (s *debugger) listener(event int, ctx interface{}) {
case roadrunner.EventPoolError:
s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx))
}
+
+ //s.logger.Warning(event, ctx)
}
func statusColor(status int) string {
diff --git a/error_buffer.go b/error_buffer.go
index 27f35e78..8c240e26 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -5,51 +5,67 @@ import (
"sync"
)
-// EventStderrOutput - is triggered when worker sends data into stderr. The context is output data in []bytes form.
+// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte).
const EventStderrOutput = 1900
// thread safe errBuffer
type errBuffer struct {
- mu sync.Mutex
- buffer *bytes.Buffer
- lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ buf []byte
+ off int
+ lsn func(event int, ctx interface{})
+}
+
+func newErrBuffer() *errBuffer {
+ buf := &errBuffer{buf: make([]byte, 0)}
+ return buf
}
// Listen attaches error stream even listener.
-func (b *errBuffer) Listen(l func(event int, ctx interface{})) {
- b.mu.Lock()
- defer b.mu.Unlock()
+func (eb *errBuffer) Listen(l func(event int, ctx interface{})) {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- b.lsn = l
+ eb.lsn = l
}
-// Len returns the number of bytes of the unread portion of the errBuffer;
-// b.Len() == len(b.Bytes()).
-func (b *errBuffer) Len() int {
- b.mu.Lock()
- defer b.mu.Unlock()
+// Len returns the number of buf of the unread portion of the errBuffer;
+// buf.Len() == len(buf.Bytes()).
+func (eb *errBuffer) Len() int {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- return b.buffer.Len()
+ // currently active message
+ return len(eb.buf) - eb.off
}
// Write appends the contents of p to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of p; err is always nil. If the
-// errBuffer becomes too large, Write will panic with ErrTooLarge.
-func (b *errBuffer) Write(p []byte) (n int, err error) {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.lsn != nil {
- b.lsn(EventStderrOutput, p)
+// needed. The return value n is the length of p; err is always nil.
+func (eb *errBuffer) Write(p []byte) (int, error) {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
+
+ eb.buf = append(eb.buf, p...)
+ for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() {
+ eb.lsn(EventStderrOutput, msg)
}
- return b.buffer.Write(p)
+ return len(p), nil
}
// Strings fetches all errBuffer data into string.
-func (b *errBuffer) String() string {
- b.mu.Lock()
- defer b.mu.Unlock()
+func (eb *errBuffer) String() string {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
+
+ return string(eb.buf[eb.off:])
+}
+
+func (eb *errBuffer) fetchMsg() []byte {
+ if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 {
+ eb.off += i + 2
+ return eb.buf[eb.off-i-2 : eb.off]
+ }
- return b.buffer.String()
+ return nil
}
diff --git a/error_buffer_test.go b/error_buffer_test.go
index aa6a17f2..2bf98915 100644
--- a/error_buffer_test.go
+++ b/error_buffer_test.go
@@ -1,31 +1,61 @@
package roadrunner
import (
- "bytes"
"github.com/stretchr/testify/assert"
"testing"
)
func TestErrBuffer_Write_Len(t *testing.T) {
- buf := &errBuffer{buffer: new(bytes.Buffer)}
+ buf := newErrBuffer()
+
buf.Write([]byte("hello"))
assert.Equal(t, 5, buf.Len())
- assert.Equal(t, buf.String(), "hello")
+ assert.Equal(t, "hello", buf.String())
}
func TestErrBuffer_Write_Event(t *testing.T) {
- buf := &errBuffer{buffer: new(bytes.Buffer)}
+ buf := newErrBuffer()
tr := make(chan interface{})
buf.Listen(func(event int, ctx interface{}) {
assert.Equal(t, EventStderrOutput, event)
- assert.Equal(t, []byte("hello"), ctx)
+ assert.Equal(t, []byte("hello\n\n"), ctx)
close(tr)
})
- buf.Write([]byte("hello"))
+ buf.Write([]byte("hello\n\n"))
<-tr
- assert.Equal(t, 5, buf.Len())
- assert.Equal(t, buf.String(), "hello")
+
+ // messages are read
+ assert.Equal(t, 0, buf.Len())
+ assert.Equal(t, "", buf.String())
+}
+
+func TestErrBuffer_Write_Event_Separated(t *testing.T) {
+ buf := newErrBuffer()
+
+ tr := make(chan interface{})
+ buf.Listen(func(event int, ctx interface{}) {
+ assert.Equal(t, EventStderrOutput, event)
+ assert.Equal(t, []byte("hello\n\n"), ctx)
+ close(tr)
+ })
+
+ buf.Write([]byte("hel"))
+ buf.Write([]byte("lo\n\n"))
+ buf.Write([]byte("ending"))
+
+ <-tr
+ assert.Equal(t, 6, buf.Len())
+ assert.Equal(t, "ending", buf.String())
+}
+
+func TestErrBuffer_Write_Remaining(t *testing.T) {
+ buf := newErrBuffer()
+
+ buf.Write([]byte("hel"))
+
+ assert.Equal(t, 3, buf.Len())
+ assert.Equal(t, "hel", buf.String())
}
diff --git a/service/http/service.go b/service/http/service.go
index 3d200845..d98a4596 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -7,6 +7,7 @@ import (
"github.com/spiral/roadrunner/service/rpc"
"net/http"
"sync"
+ "sync/atomic"
)
// ID contains default svc name.
@@ -21,10 +22,11 @@ type Service struct {
lsns []func(event int, ctx interface{})
mdws []middleware
- mu sync.Mutex
- rr *roadrunner.Server
- srv *Handler
- http *http.Server
+ mu sync.Mutex
+ rr *roadrunner.Server
+ inStopping int32
+ srv *Handler
+ http *http.Server
}
// AddMiddleware adds new net/http middleware.
@@ -95,9 +97,12 @@ func (s *Service) Serve() error {
// Stop stops the svc.
func (s *Service) Stop() {
+ atomic.AddInt32(&s.inStopping, 1)
+ defer atomic.AddInt32(&s.inStopping, -1)
+
s.mu.Lock()
defer s.mu.Unlock()
- if s.http == nil {
+ if s.http == nil || s.stopping() {
return
}
@@ -115,12 +120,16 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.srv.ServeHTTP(w, r)
}
+func (s *Service) stopping() bool {
+ return atomic.LoadInt32(&s.inStopping) != 0
+}
+
func (s *Service) listener(event int, ctx interface{}) {
for _, l := range s.lsns {
l(event, ctx)
}
- if event == roadrunner.EventServerFailure {
+ if event == roadrunner.EventServerFailure && !s.stopping() {
// attempting rr server restart
if err := s.rr.Start(); err != nil {
s.Stop()
diff --git a/static_pool.go b/static_pool.go
index ccd3f27a..cce6bd6a 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -28,10 +28,10 @@ type StaticPool struct {
// active task executions
tasks sync.WaitGroup
- // workers circular allocation buffer
+ // workers circular allocation buf
free chan *Worker
- // number of workers expected to be dead in a buffer.
+ // number of workers expected to be dead in a buf.
numDead int64
// protects state of worker list, does not affect allocation
@@ -151,7 +151,6 @@ func (p *StaticPool) Destroy() {
var wg sync.WaitGroup
for _, w := range p.Workers() {
wg.Add(1)
- go w.Stop()
go func(w *Worker) {
defer wg.Done()
p.destroyWorker(w, nil)
diff --git a/worker.go b/worker.go
index 811bda5f..d1612edd 100644
--- a/worker.go
+++ b/worker.go
@@ -1,7 +1,6 @@
package roadrunner
import (
- "bytes"
"fmt"
"github.com/pkg/errors"
"github.com/spiral/goridge"
@@ -24,7 +23,7 @@ type Worker struct {
Created time.Time
// state holds information about current worker state,
- // number of worker executions, last status change time.
+ // number of worker executions, buf status change time.
// publicly this object is receive-only and protected using Mutex
// and atomic counter.
state *state
@@ -60,7 +59,7 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) {
w := &Worker{
Created: time.Now(),
cmd: cmd,
- err: &errBuffer{buffer: new(bytes.Buffer)},
+ err: newErrBuffer(),
waitDone: make(chan interface{}),
state: newState(StateInactive),
}