summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md7
-rwxr-xr-xbuild.sh7
-rw-r--r--cmd/rr/cmd/serve.go9
-rw-r--r--cmd/rr/cmd/version.go8
-rw-r--r--cmd/rr/debug/debugger.go10
-rw-r--r--error_buffer.go108
-rw-r--r--error_buffer_test.go57
-rw-r--r--php-src/tests/http/echoerr.php12
-rw-r--r--server_config.go5
-rw-r--r--service/http/handler_test.go18
-rw-r--r--service/http/rpc_test.go66
-rw-r--r--service/http/service.go23
-rw-r--r--service/http/service_test.go60
-rw-r--r--service/rpc/config.go5
-rw-r--r--service/rpc/service.go2
-rw-r--r--static_pool.go24
-rw-r--r--worker.go7
17 files changed, 368 insertions, 60 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8a4eb922..7819504f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,13 @@
CHANGELOG
=========
+v1.0.2 (23.06.2018)
+-------
+- rr would provide error log from workers in realtime now
+- even better service shutdown
+- safer unix socket allocation
+- minor CS
+
v1.0.2 (19.06.2018)
-------
- more validations for user configs
diff --git a/build.sh b/build.sh
index fd84fc79..85b0b306 100755
--- a/build.sh
+++ b/build.sh
@@ -1,11 +1,8 @@
#!/bin/bash
-set -e
-
cd $(dirname "${BASH_SOURCE[0]}")
OD="$(pwd)"
-
# Pushes application version into the build information.
-RR_VERSION=1.0.1
+RR_VERSION=1.0.3
# Hardcode some values to the core package
LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}"
@@ -47,4 +44,4 @@ if [ "$1" == "all" ]; then
exit
fi
-CGO_ENABLED=0 go build -ldflags "$LDFLAGS -extldflags '-static'" -o "$OD/rr" cmd/rr/main.go \ No newline at end of file
+CGO_ENABLED=0 go build -ldflags "$LDFLAGS -extldflags '-static'" -o "$OD/rr" cmd/rr/main.go
diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go
index c53f7ce9..664baf15 100644
--- a/cmd/rr/cmd/serve.go
+++ b/cmd/rr/cmd/serve.go
@@ -36,12 +36,15 @@ func init() {
RunE: serveHandler,
})
- signal.Notify(stopSignal, syscall.SIGTERM)
- signal.Notify(stopSignal, syscall.SIGINT)
+ signal.Notify(stopSignal, os.Interrupt, os.Kill, syscall.SIGTERM)
}
func serveHandler(cmd *cobra.Command, args []string) error {
- go Container.Serve()
+ go func() {
+ Container.Serve()
+ stopSignal <- nil
+ }()
+
<-stopSignal
Container.Stop()
diff --git a/cmd/rr/cmd/version.go b/cmd/rr/cmd/version.go
index 5edb7543..b134a70b 100644
--- a/cmd/rr/cmd/version.go
+++ b/cmd/rr/cmd/version.go
@@ -1,6 +1,10 @@
package cmd
+import "time"
+
var (
- Version = "1.0.0" // Placeholder for the version
- BuildTime = "development" // Placeholder for the build time
+ // Version - defines build version.
+ Version = "development"
+ // BuildTime - defined build time.
+ BuildTime = time.Now()
)
diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go
index 0621285b..0dca43de 100644
--- a/cmd/rr/debug/debugger.go
+++ b/cmd/rr/debug/debugger.go
@@ -5,6 +5,7 @@ import (
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/cmd/rr/utils"
"github.com/spiral/roadrunner/service/http"
+ "strings"
)
// Listener creates new debug listener.
@@ -45,7 +46,6 @@ func (s *debugger) listener(event int, ctx interface{}) {
"<white+hb>worker.%v</reset> <yellow>killed</red>",
*w.Pid,
))
-
case roadrunner.EventWorkerError:
err := ctx.(roadrunner.WorkerError)
s.logger.Error(utils.Sprintf(
@@ -55,6 +55,12 @@ func (s *debugger) listener(event int, ctx interface{}) {
))
}
+ // outputs
+ switch event {
+ case roadrunner.EventStderrOutput:
+ s.logger.Warning(strings.Trim(string(ctx.([]byte)), "\r\n"))
+ }
+
// rr server events
switch event {
case roadrunner.EventServerFailure:
@@ -68,6 +74,8 @@ func (s *debugger) listener(event int, ctx interface{}) {
case roadrunner.EventPoolError:
s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx))
}
+
+ //s.logger.Warning(event, ctx)
}
func statusColor(status int) string {
diff --git a/error_buffer.go b/error_buffer.go
index fcf566c8..8be9c5a8 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -1,39 +1,111 @@
package roadrunner
import (
- "bytes"
"sync"
+ "time"
+)
+
+const (
+ // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte).
+ EventStderrOutput = 1900
+
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output
+ // together since lastError update (required to keep error update together).
+ WaitDuration = 100 * time.Millisecond
)
// thread safe errBuffer
type errBuffer struct {
mu sync.Mutex
- buffer *bytes.Buffer
+ buf []byte
+ last int
+ wait *time.Timer
+ update chan interface{}
+ stop chan interface{}
+ lsn func(event int, ctx interface{})
+}
+
+func newErrBuffer() *errBuffer {
+ eb := &errBuffer{
+ buf: make([]byte, 0),
+ update: make(chan interface{}),
+ wait: time.NewTimer(WaitDuration),
+ stop: make(chan interface{}),
+ }
+
+ go func() {
+ for {
+ select {
+ case <-eb.update:
+ eb.wait.Reset(WaitDuration)
+ case <-eb.wait.C:
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb.lsn != nil {
+ eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ }
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ case <-eb.stop:
+ eb.wait.Stop()
+
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb.lsn != nil {
+ eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ }
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return eb
+}
+
+// Listen attaches error stream even listener.
+func (eb *errBuffer) Listen(l func(event int, ctx interface{})) {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
+
+ eb.lsn = l
}
-// Len returns the number of bytes of the unread portion of the errBuffer;
-// b.Len() == len(b.Bytes()).
-func (b *errBuffer) Len() int {
- b.mu.Lock()
- defer b.mu.Unlock()
+// Len returns the number of buf of the unread portion of the errBuffer;
+// buf.Len() == len(buf.Bytes()).
+func (eb *errBuffer) Len() int {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- return b.buffer.Len()
+ // currently active message
+ return len(eb.buf)
}
// Write appends the contents of p to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of p; err is always nil. If the
-// errBuffer becomes too large, Write will panic with ErrTooLarge.
-func (b *errBuffer) Write(p []byte) (n int, err error) {
- b.mu.Lock()
- defer b.mu.Unlock()
+// needed. The return value n is the length of p; err is always nil.
+func (eb *errBuffer) Write(p []byte) (int, error) {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- return b.buffer.Write(p)
+ eb.buf = append(eb.buf, p...)
+ eb.update <- nil
+
+ return len(p), nil
}
// Strings fetches all errBuffer data into string.
-func (b *errBuffer) String() string {
- b.mu.Lock()
- defer b.mu.Unlock()
+func (eb *errBuffer) String() string {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
+
+ return string(eb.buf)
+}
- return b.buffer.String()
+// Close aggregation timer.
+func (eb *errBuffer) Close() error {
+ close(eb.stop)
+ return nil
}
diff --git a/error_buffer_test.go b/error_buffer_test.go
index afbc80e2..09ea4f03 100644
--- a/error_buffer_test.go
+++ b/error_buffer_test.go
@@ -1,14 +1,65 @@
package roadrunner
import (
- "bytes"
"github.com/stretchr/testify/assert"
"testing"
)
func TestErrBuffer_Write_Len(t *testing.T) {
- buf := &errBuffer{buffer: new(bytes.Buffer)}
+ buf := newErrBuffer()
+ defer buf.Close()
+
buf.Write([]byte("hello"))
assert.Equal(t, 5, buf.Len())
- assert.Equal(t, buf.String(), "hello")
+ assert.Equal(t, "hello", buf.String())
+}
+
+func TestErrBuffer_Write_Event(t *testing.T) {
+ buf := newErrBuffer()
+ defer buf.Close()
+
+ tr := make(chan interface{})
+ buf.Listen(func(event int, ctx interface{}) {
+ assert.Equal(t, EventStderrOutput, event)
+ assert.Equal(t, []byte("hello\n"), ctx)
+ close(tr)
+ })
+
+ buf.Write([]byte("hello\n"))
+
+ <-tr
+
+ // messages are read
+ assert.Equal(t, 6, buf.Len())
+ assert.Equal(t, "hello\n", buf.String())
+}
+
+func TestErrBuffer_Write_Event_Separated(t *testing.T) {
+ buf := newErrBuffer()
+ defer buf.Close()
+
+ tr := make(chan interface{})
+ buf.Listen(func(event int, ctx interface{}) {
+ assert.Equal(t, EventStderrOutput, event)
+ assert.Equal(t, []byte("hello\nending"), ctx)
+ close(tr)
+ })
+
+ buf.Write([]byte("hel"))
+ buf.Write([]byte("lo\n"))
+ buf.Write([]byte("ending"))
+
+ <-tr
+ assert.Equal(t, 12, buf.Len())
+ assert.Equal(t, "hello\nending", buf.String())
+}
+
+func TestErrBuffer_Write_Remaining(t *testing.T) {
+ buf := newErrBuffer()
+ defer buf.Close()
+
+ buf.Write([]byte("hel"))
+
+ assert.Equal(t, 3, buf.Len())
+ assert.Equal(t, "hel", buf.String())
}
diff --git a/php-src/tests/http/echoerr.php b/php-src/tests/http/echoerr.php
new file mode 100644
index 00000000..da2ff4d8
--- /dev/null
+++ b/php-src/tests/http/echoerr.php
@@ -0,0 +1,12 @@
+<?php
+
+use \Psr\Http\Message\ServerRequestInterface;
+use \Psr\Http\Message\ResponseInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ error_log(strtoupper($req->getQueryParams()['hello']));
+
+ $resp->getBody()->write(strtoupper($req->getQueryParams()['hello']));
+ return $resp->withStatus(201);
+} \ No newline at end of file
diff --git a/server_config.go b/server_config.go
index ecd7dd2b..b927c8c6 100644
--- a/server_config.go
+++ b/server_config.go
@@ -5,6 +5,7 @@ import (
"net"
"os/exec"
"strings"
+ "syscall"
"time"
)
@@ -51,6 +52,10 @@ func (cfg *ServerConfig) makeFactory() (Factory, error) {
return nil, errors.New("invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)")
}
+ if dsn[0] == "unix" {
+ syscall.Unlink(dsn[1])
+ }
+
ln, err := net.Listen(dsn[0], dsn[1])
if err != nil {
return nil, err
diff --git a/service/http/handler_test.go b/service/http/handler_test.go
index 4a11c562..59a4c7c0 100644
--- a/service/http/handler_test.go
+++ b/service/http/handler_test.go
@@ -51,13 +51,13 @@ func TestServer_Echo(t *testing.T) {
assert.NoError(t, st.rr.Start())
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
time.Sleep(time.Millisecond * 10)
- body, r, err := get("http://localhost:8077/?hello=world")
+ body, r, err := get("http://localhost:8177/?hello=world")
assert.NoError(t, err)
assert.Equal(t, 201, r.StatusCode)
assert.Equal(t, "WORLD", body)
@@ -686,13 +686,13 @@ func TestServer_Error(t *testing.T) {
assert.NoError(t, st.rr.Start())
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
time.Sleep(time.Millisecond * 10)
- _, r, err := get("http://localhost:8077/?hello=world")
+ _, r, err := get("http://localhost:8177/?hello=world")
assert.NoError(t, err)
assert.Equal(t, 500, r.StatusCode)
}
@@ -720,13 +720,13 @@ func TestServer_Error2(t *testing.T) {
assert.NoError(t, st.rr.Start())
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
time.Sleep(time.Millisecond * 10)
- _, r, err := get("http://localhost:8077/?hello=world")
+ _, r, err := get("http://localhost:8177/?hello=world")
assert.NoError(t, err)
assert.Equal(t, 500, r.StatusCode)
}
@@ -754,7 +754,7 @@ func TestServer_Error3(t *testing.T) {
assert.NoError(t, st.rr.Start())
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -799,7 +799,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
st.rr.Start()
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -807,7 +807,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
bb := "WORLD"
for n := 0; n < b.N; n++ {
- r, err := http.Get("http://localhost:8077/?hello=world")
+ r, err := http.Get("http://localhost:8177/?hello=world")
if err != nil {
b.Fail()
}
diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go
index fc47a70f..c392b060 100644
--- a/service/http/rpc_test.go
+++ b/service/http/rpc_test.go
@@ -1,11 +1,14 @@
package http
import (
+ "encoding/json"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/spiral/roadrunner/service"
"github.com/spiral/roadrunner/service/rpc"
"github.com/stretchr/testify/assert"
+ "os"
+ "runtime"
"strconv"
"testing"
"time"
@@ -65,6 +68,67 @@ func Test_RPC(t *testing.T) {
assert.NotEqual(t, res, res2)
}
+func Test_RPC_Unix(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+
+ sock := `unix://` + os.TempDir() + `/rpc.unix`
+ j, _ := json.Marshal(sock)
+
+ assert.NoError(t, c.Init(&testCfg{
+ rpcCfg: `{"enable":true, "listen":` + string(j) + `}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, _ := c.Get(ID)
+ ss := s.(*Service)
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ res, _, _ := get("http://localhost:6029")
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ r := ""
+ assert.NoError(t, cl.Call("http.Reset", true, &r))
+ assert.Equal(t, "OK", r)
+
+ res2, _, _ := get("http://localhost:6029")
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2)
+ assert.NotEqual(t, res, res2)
+}
+
func Test_Workers(t *testing.T) {
logger, _ := test.NewNullLogger()
logger.SetLevel(logrus.DebugLevel)
@@ -74,7 +138,7 @@ func Test_Workers(t *testing.T) {
c.Register(ID, &Service{})
assert.NoError(t, c.Init(&testCfg{
- rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`,
+ rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`,
httpCfg: `{
"enable": true,
"address": ":6029",
diff --git a/service/http/service.go b/service/http/service.go
index 3d200845..cef019b3 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -7,6 +7,7 @@ import (
"github.com/spiral/roadrunner/service/rpc"
"net/http"
"sync"
+ "sync/atomic"
)
// ID contains default svc name.
@@ -21,10 +22,11 @@ type Service struct {
lsns []func(event int, ctx interface{})
mdws []middleware
- mu sync.Mutex
- rr *roadrunner.Server
- srv *Handler
- http *http.Server
+ mu sync.Mutex
+ rr *roadrunner.Server
+ stopping int32
+ srv *Handler
+ http *http.Server
}
// AddMiddleware adds new net/http middleware.
@@ -95,6 +97,11 @@ func (s *Service) Serve() error {
// Stop stops the svc.
func (s *Service) Stop() {
+ if atomic.LoadInt32(&s.stopping) != 0 {
+ // already stopping
+ return
+ }
+
s.mu.Lock()
defer s.mu.Unlock()
if s.http == nil {
@@ -121,9 +128,11 @@ func (s *Service) listener(event int, ctx interface{}) {
}
if event == roadrunner.EventServerFailure {
- // attempting rr server restart
- if err := s.rr.Start(); err != nil {
- s.Stop()
+ if atomic.LoadInt32(&s.stopping) != 0 {
+ // attempting rr server restart
+ if err := s.rr.Start(); err != nil {
+ s.Stop()
+ }
}
}
}
diff --git a/service/http/service_test.go b/service/http/service_test.go
index 55fa660b..02d1c3f0 100644
--- a/service/http/service_test.go
+++ b/service/http/service_test.go
@@ -163,6 +163,66 @@ func Test_Service_Echo(t *testing.T) {
assert.Equal(t, "WORLD", string(b))
}
+func Test_Service_ErrorEcho(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echoerr pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusConfigured, st)
+
+ goterr := make(chan interface{})
+ s.(*Service).AddListener(func(event int, ctx interface{}) {
+ if event == roadrunner.EventStderrOutput {
+ if string(ctx.([]byte)) == "WORLD\n" {
+ goterr <- nil
+ }
+ }
+ })
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ <-goterr
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+}
+
func Test_Service_Middleware(t *testing.T) {
logger, _ := test.NewNullLogger()
logger.SetLevel(logrus.DebugLevel)
diff --git a/service/rpc/config.go b/service/rpc/config.go
index 06d63d65..e3168945 100644
--- a/service/rpc/config.go
+++ b/service/rpc/config.go
@@ -4,6 +4,7 @@ import (
"errors"
"net"
"strings"
+ "syscall"
)
type config struct {
@@ -21,6 +22,10 @@ func (cfg *config) listener() (net.Listener, error) {
return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
}
+ if dsn[0] == "unix" {
+ syscall.Unlink(dsn[1])
+ }
+
return net.Listen(dsn[0], dsn[1])
}
diff --git a/service/rpc/service.go b/service/rpc/service.go
index e1147754..82f26407 100644
--- a/service/rpc/service.go
+++ b/service/rpc/service.go
@@ -60,7 +60,7 @@ func (s *Service) Serve() error {
for {
select {
case <-s.stop:
- break
+ return
default:
conn, err := ln.Accept()
if err != nil {
diff --git a/static_pool.go b/static_pool.go
index b3e4f488..b974ac90 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -28,10 +28,10 @@ type StaticPool struct {
// active task executions
tasks sync.WaitGroup
- // workers circular allocation buffer
+ // workers circular allocation buf
free chan *Worker
- // number of workers expected to be dead in a buffer.
+ // number of workers expected to be dead in a buf.
numDead int64
// protects state of worker list, does not affect allocation
@@ -40,7 +40,7 @@ type StaticPool struct {
// all registered workers
workers []*Worker
- // pool is being destroying
+ // pool is being destroyed
inDestroy int32
// lsn is optional callback to handle worker create/destruct/error events.
@@ -83,6 +83,12 @@ func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
defer p.mul.Unlock()
p.lsn = l
+
+ p.muw.Lock()
+ for _, w := range p.workers {
+ w.err.Listen(p.lsn)
+ }
+ p.muw.Unlock()
}
// Config returns associated pool configuration. Immutable.
@@ -138,14 +144,12 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
// Destroy all underlying workers (but let them to complete the task).
func (p *StaticPool) Destroy() {
atomic.AddInt32(&p.inDestroy, 1)
- defer atomic.AddInt32(&p.inDestroy, -1)
p.tasks.Wait()
var wg sync.WaitGroup
for _, w := range p.Workers() {
wg.Add(1)
- go w.Stop()
go func(w *Worker) {
defer wg.Done()
p.destroyWorker(w, nil)
@@ -207,6 +211,12 @@ func (p *StaticPool) createWorker() (*Worker, error) {
return nil, err
}
+ p.mul.Lock()
+ if p.lsn != nil {
+ w.err.Listen(p.lsn)
+ }
+ p.mul.Unlock()
+
p.throw(EventWorkerConstruct, w)
p.muw.Lock()
@@ -259,7 +269,7 @@ func (p *StaticPool) watchWorker(w *Worker) {
p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
}
- if !p.destroying() {
+ if !p.destroyed() {
nw, err := p.createWorker()
if err == nil {
p.free <- nw
@@ -275,7 +285,7 @@ func (p *StaticPool) watchWorker(w *Worker) {
}
}
-func (p *StaticPool) destroying() bool {
+func (p *StaticPool) destroyed() bool {
return atomic.LoadInt32(&p.inDestroy) != 0
}
diff --git a/worker.go b/worker.go
index 811bda5f..c52960b2 100644
--- a/worker.go
+++ b/worker.go
@@ -1,7 +1,6 @@
package roadrunner
import (
- "bytes"
"fmt"
"github.com/pkg/errors"
"github.com/spiral/goridge"
@@ -24,7 +23,7 @@ type Worker struct {
Created time.Time
// state holds information about current worker state,
- // number of worker executions, last status change time.
+ // number of worker executions, buf status change time.
// publicly this object is receive-only and protected using Mutex
// and atomic counter.
state *state
@@ -60,7 +59,7 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) {
w := &Worker{
Created: time.Now(),
cmd: cmd,
- err: &errBuffer{buffer: new(bytes.Buffer)},
+ err: newErrBuffer(),
waitDone: make(chan interface{}),
state: newState(StateInactive),
}
@@ -212,6 +211,8 @@ func (w *Worker) start() error {
if w.rl != nil {
w.rl.Close()
}
+
+ w.err.Close()
}
}()