diff options
-rw-r--r-- | CHANGELOG.md | 7 | ||||
-rwxr-xr-x | build.sh | 7 | ||||
-rw-r--r-- | cmd/rr/cmd/serve.go | 9 | ||||
-rw-r--r-- | cmd/rr/cmd/version.go | 8 | ||||
-rw-r--r-- | cmd/rr/debug/debugger.go | 10 | ||||
-rw-r--r-- | error_buffer.go | 108 | ||||
-rw-r--r-- | error_buffer_test.go | 57 | ||||
-rw-r--r-- | php-src/tests/http/echoerr.php | 12 | ||||
-rw-r--r-- | server_config.go | 5 | ||||
-rw-r--r-- | service/http/handler_test.go | 18 | ||||
-rw-r--r-- | service/http/rpc_test.go | 66 | ||||
-rw-r--r-- | service/http/service.go | 23 | ||||
-rw-r--r-- | service/http/service_test.go | 60 | ||||
-rw-r--r-- | service/rpc/config.go | 5 | ||||
-rw-r--r-- | service/rpc/service.go | 2 | ||||
-rw-r--r-- | static_pool.go | 24 | ||||
-rw-r--r-- | worker.go | 7 |
17 files changed, 368 insertions, 60 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a4eb922..7819504f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,13 @@ CHANGELOG ========= +v1.0.2 (23.06.2018) +------- +- rr would provide error log from workers in realtime now +- even better service shutdown +- safer unix socket allocation +- minor CS + v1.0.2 (19.06.2018) ------- - more validations for user configs @@ -1,11 +1,8 @@ #!/bin/bash -set -e - cd $(dirname "${BASH_SOURCE[0]}") OD="$(pwd)" - # Pushes application version into the build information. -RR_VERSION=1.0.1 +RR_VERSION=1.0.3 # Hardcode some values to the core package LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}" @@ -47,4 +44,4 @@ if [ "$1" == "all" ]; then exit fi -CGO_ENABLED=0 go build -ldflags "$LDFLAGS -extldflags '-static'" -o "$OD/rr" cmd/rr/main.go
\ No newline at end of file +CGO_ENABLED=0 go build -ldflags "$LDFLAGS -extldflags '-static'" -o "$OD/rr" cmd/rr/main.go diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go index c53f7ce9..664baf15 100644 --- a/cmd/rr/cmd/serve.go +++ b/cmd/rr/cmd/serve.go @@ -36,12 +36,15 @@ func init() { RunE: serveHandler, }) - signal.Notify(stopSignal, syscall.SIGTERM) - signal.Notify(stopSignal, syscall.SIGINT) + signal.Notify(stopSignal, os.Interrupt, os.Kill, syscall.SIGTERM) } func serveHandler(cmd *cobra.Command, args []string) error { - go Container.Serve() + go func() { + Container.Serve() + stopSignal <- nil + }() + <-stopSignal Container.Stop() diff --git a/cmd/rr/cmd/version.go b/cmd/rr/cmd/version.go index 5edb7543..b134a70b 100644 --- a/cmd/rr/cmd/version.go +++ b/cmd/rr/cmd/version.go @@ -1,6 +1,10 @@ package cmd +import "time" + var ( - Version = "1.0.0" // Placeholder for the version - BuildTime = "development" // Placeholder for the build time + // Version - defines build version. + Version = "development" + // BuildTime - defined build time. + BuildTime = time.Now() ) diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go index 0621285b..0dca43de 100644 --- a/cmd/rr/debug/debugger.go +++ b/cmd/rr/debug/debugger.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/cmd/rr/utils" "github.com/spiral/roadrunner/service/http" + "strings" ) // Listener creates new debug listener. @@ -45,7 +46,6 @@ func (s *debugger) listener(event int, ctx interface{}) { "<white+hb>worker.%v</reset> <yellow>killed</red>", *w.Pid, )) - case roadrunner.EventWorkerError: err := ctx.(roadrunner.WorkerError) s.logger.Error(utils.Sprintf( @@ -55,6 +55,12 @@ func (s *debugger) listener(event int, ctx interface{}) { )) } + // outputs + switch event { + case roadrunner.EventStderrOutput: + s.logger.Warning(strings.Trim(string(ctx.([]byte)), "\r\n")) + } + // rr server events switch event { case roadrunner.EventServerFailure: @@ -68,6 +74,8 @@ func (s *debugger) listener(event int, ctx interface{}) { case roadrunner.EventPoolError: s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx)) } + + //s.logger.Warning(event, ctx) } func statusColor(status int) string { diff --git a/error_buffer.go b/error_buffer.go index fcf566c8..8be9c5a8 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -1,39 +1,111 @@ package roadrunner import ( - "bytes" "sync" + "time" +) + +const ( + // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). + EventStderrOutput = 1900 + + // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output + // together since lastError update (required to keep error update together). + WaitDuration = 100 * time.Millisecond ) // thread safe errBuffer type errBuffer struct { mu sync.Mutex - buffer *bytes.Buffer + buf []byte + last int + wait *time.Timer + update chan interface{} + stop chan interface{} + lsn func(event int, ctx interface{}) +} + +func newErrBuffer() *errBuffer { + eb := &errBuffer{ + buf: make([]byte, 0), + update: make(chan interface{}), + wait: time.NewTimer(WaitDuration), + stop: make(chan interface{}), + } + + go func() { + for { + select { + case <-eb.update: + eb.wait.Reset(WaitDuration) + case <-eb.wait.C: + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + case <-eb.stop: + eb.wait.Stop() + + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + return + } + } + }() + + return eb +} + +// Listen attaches error stream even listener. +func (eb *errBuffer) Listen(l func(event int, ctx interface{})) { + eb.mu.Lock() + defer eb.mu.Unlock() + + eb.lsn = l } -// Len returns the number of bytes of the unread portion of the errBuffer; -// b.Len() == len(b.Bytes()). -func (b *errBuffer) Len() int { - b.mu.Lock() - defer b.mu.Unlock() +// Len returns the number of buf of the unread portion of the errBuffer; +// buf.Len() == len(buf.Bytes()). +func (eb *errBuffer) Len() int { + eb.mu.Lock() + defer eb.mu.Unlock() - return b.buffer.Len() + // currently active message + return len(eb.buf) } // Write appends the contents of p to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of p; err is always nil. If the -// errBuffer becomes too large, Write will panic with ErrTooLarge. -func (b *errBuffer) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() +// needed. The return value n is the length of p; err is always nil. +func (eb *errBuffer) Write(p []byte) (int, error) { + eb.mu.Lock() + defer eb.mu.Unlock() - return b.buffer.Write(p) + eb.buf = append(eb.buf, p...) + eb.update <- nil + + return len(p), nil } // Strings fetches all errBuffer data into string. -func (b *errBuffer) String() string { - b.mu.Lock() - defer b.mu.Unlock() +func (eb *errBuffer) String() string { + eb.mu.Lock() + defer eb.mu.Unlock() + + return string(eb.buf) +} - return b.buffer.String() +// Close aggregation timer. +func (eb *errBuffer) Close() error { + close(eb.stop) + return nil } diff --git a/error_buffer_test.go b/error_buffer_test.go index afbc80e2..09ea4f03 100644 --- a/error_buffer_test.go +++ b/error_buffer_test.go @@ -1,14 +1,65 @@ package roadrunner import ( - "bytes" "github.com/stretchr/testify/assert" "testing" ) func TestErrBuffer_Write_Len(t *testing.T) { - buf := &errBuffer{buffer: new(bytes.Buffer)} + buf := newErrBuffer() + defer buf.Close() + buf.Write([]byte("hello")) assert.Equal(t, 5, buf.Len()) - assert.Equal(t, buf.String(), "hello") + assert.Equal(t, "hello", buf.String()) +} + +func TestErrBuffer_Write_Event(t *testing.T) { + buf := newErrBuffer() + defer buf.Close() + + tr := make(chan interface{}) + buf.Listen(func(event int, ctx interface{}) { + assert.Equal(t, EventStderrOutput, event) + assert.Equal(t, []byte("hello\n"), ctx) + close(tr) + }) + + buf.Write([]byte("hello\n")) + + <-tr + + // messages are read + assert.Equal(t, 6, buf.Len()) + assert.Equal(t, "hello\n", buf.String()) +} + +func TestErrBuffer_Write_Event_Separated(t *testing.T) { + buf := newErrBuffer() + defer buf.Close() + + tr := make(chan interface{}) + buf.Listen(func(event int, ctx interface{}) { + assert.Equal(t, EventStderrOutput, event) + assert.Equal(t, []byte("hello\nending"), ctx) + close(tr) + }) + + buf.Write([]byte("hel")) + buf.Write([]byte("lo\n")) + buf.Write([]byte("ending")) + + <-tr + assert.Equal(t, 12, buf.Len()) + assert.Equal(t, "hello\nending", buf.String()) +} + +func TestErrBuffer_Write_Remaining(t *testing.T) { + buf := newErrBuffer() + defer buf.Close() + + buf.Write([]byte("hel")) + + assert.Equal(t, 3, buf.Len()) + assert.Equal(t, "hel", buf.String()) } diff --git a/php-src/tests/http/echoerr.php b/php-src/tests/http/echoerr.php new file mode 100644 index 00000000..da2ff4d8 --- /dev/null +++ b/php-src/tests/http/echoerr.php @@ -0,0 +1,12 @@ +<?php + +use \Psr\Http\Message\ServerRequestInterface; +use \Psr\Http\Message\ResponseInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + error_log(strtoupper($req->getQueryParams()['hello'])); + + $resp->getBody()->write(strtoupper($req->getQueryParams()['hello'])); + return $resp->withStatus(201); +}
\ No newline at end of file diff --git a/server_config.go b/server_config.go index ecd7dd2b..b927c8c6 100644 --- a/server_config.go +++ b/server_config.go @@ -5,6 +5,7 @@ import ( "net" "os/exec" "strings" + "syscall" "time" ) @@ -51,6 +52,10 @@ func (cfg *ServerConfig) makeFactory() (Factory, error) { return nil, errors.New("invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)") } + if dsn[0] == "unix" { + syscall.Unlink(dsn[1]) + } + ln, err := net.Listen(dsn[0], dsn[1]) if err != nil { return nil, err diff --git a/service/http/handler_test.go b/service/http/handler_test.go index 4a11c562..59a4c7c0 100644 --- a/service/http/handler_test.go +++ b/service/http/handler_test.go @@ -51,13 +51,13 @@ func TestServer_Echo(t *testing.T) { assert.NoError(t, st.rr.Start()) defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() time.Sleep(time.Millisecond * 10) - body, r, err := get("http://localhost:8077/?hello=world") + body, r, err := get("http://localhost:8177/?hello=world") assert.NoError(t, err) assert.Equal(t, 201, r.StatusCode) assert.Equal(t, "WORLD", body) @@ -686,13 +686,13 @@ func TestServer_Error(t *testing.T) { assert.NoError(t, st.rr.Start()) defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() time.Sleep(time.Millisecond * 10) - _, r, err := get("http://localhost:8077/?hello=world") + _, r, err := get("http://localhost:8177/?hello=world") assert.NoError(t, err) assert.Equal(t, 500, r.StatusCode) } @@ -720,13 +720,13 @@ func TestServer_Error2(t *testing.T) { assert.NoError(t, st.rr.Start()) defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() time.Sleep(time.Millisecond * 10) - _, r, err := get("http://localhost:8077/?hello=world") + _, r, err := get("http://localhost:8177/?hello=world") assert.NoError(t, err) assert.Equal(t, 500, r.StatusCode) } @@ -754,7 +754,7 @@ func TestServer_Error3(t *testing.T) { assert.NoError(t, st.rr.Start()) defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -799,7 +799,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { st.rr.Start() defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -807,7 +807,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { bb := "WORLD" for n := 0; n < b.N; n++ { - r, err := http.Get("http://localhost:8077/?hello=world") + r, err := http.Get("http://localhost:8177/?hello=world") if err != nil { b.Fail() } diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go index fc47a70f..c392b060 100644 --- a/service/http/rpc_test.go +++ b/service/http/rpc_test.go @@ -1,11 +1,14 @@ package http import ( + "encoding/json" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/spiral/roadrunner/service" "github.com/spiral/roadrunner/service/rpc" "github.com/stretchr/testify/assert" + "os" + "runtime" "strconv" "testing" "time" @@ -65,6 +68,67 @@ func Test_RPC(t *testing.T) { assert.NotEqual(t, res, res2) } +func Test_RPC_Unix(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rpc.ID, &rpc.Service{}) + c.Register(ID, &Service{}) + + sock := `unix://` + os.TempDir() + `/rpc.unix` + j, _ := json.Marshal(sock) + + assert.NoError(t, c.Init(&testCfg{ + rpcCfg: `{"enable":true, "listen":` + string(j) + `}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, _ := c.Get(ID) + ss := s.(*Service) + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + res, _, _ := get("http://localhost:6029") + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res) + + cl, err := rs.Client() + assert.NoError(t, err) + + r := "" + assert.NoError(t, cl.Call("http.Reset", true, &r)) + assert.Equal(t, "OK", r) + + res2, _, _ := get("http://localhost:6029") + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2) + assert.NotEqual(t, res, res2) +} + func Test_Workers(t *testing.T) { logger, _ := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) @@ -74,7 +138,7 @@ func Test_Workers(t *testing.T) { c.Register(ID, &Service{}) assert.NoError(t, c.Init(&testCfg{ - rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`, + rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`, httpCfg: `{ "enable": true, "address": ":6029", diff --git a/service/http/service.go b/service/http/service.go index 3d200845..cef019b3 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -7,6 +7,7 @@ import ( "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" + "sync/atomic" ) // ID contains default svc name. @@ -21,10 +22,11 @@ type Service struct { lsns []func(event int, ctx interface{}) mdws []middleware - mu sync.Mutex - rr *roadrunner.Server - srv *Handler - http *http.Server + mu sync.Mutex + rr *roadrunner.Server + stopping int32 + srv *Handler + http *http.Server } // AddMiddleware adds new net/http middleware. @@ -95,6 +97,11 @@ func (s *Service) Serve() error { // Stop stops the svc. func (s *Service) Stop() { + if atomic.LoadInt32(&s.stopping) != 0 { + // already stopping + return + } + s.mu.Lock() defer s.mu.Unlock() if s.http == nil { @@ -121,9 +128,11 @@ func (s *Service) listener(event int, ctx interface{}) { } if event == roadrunner.EventServerFailure { - // attempting rr server restart - if err := s.rr.Start(); err != nil { - s.Stop() + if atomic.LoadInt32(&s.stopping) != 0 { + // attempting rr server restart + if err := s.rr.Start(); err != nil { + s.Stop() + } } } } diff --git a/service/http/service_test.go b/service/http/service_test.go index 55fa660b..02d1c3f0 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -163,6 +163,66 @@ func Test_Service_Echo(t *testing.T) { assert.Equal(t, "WORLD", string(b)) } +func Test_Service_ErrorEcho(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echoerr pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusConfigured, st) + + goterr := make(chan interface{}) + s.(*Service).AddListener(func(event int, ctx interface{}) { + if event == roadrunner.EventStderrOutput { + if string(ctx.([]byte)) == "WORLD\n" { + goterr <- nil + } + } + }) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + <-goterr + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) +} + func Test_Service_Middleware(t *testing.T) { logger, _ := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) diff --git a/service/rpc/config.go b/service/rpc/config.go index 06d63d65..e3168945 100644 --- a/service/rpc/config.go +++ b/service/rpc/config.go @@ -4,6 +4,7 @@ import ( "errors" "net" "strings" + "syscall" ) type config struct { @@ -21,6 +22,10 @@ func (cfg *config) listener() (net.Listener, error) { return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") } + if dsn[0] == "unix" { + syscall.Unlink(dsn[1]) + } + return net.Listen(dsn[0], dsn[1]) } diff --git a/service/rpc/service.go b/service/rpc/service.go index e1147754..82f26407 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -60,7 +60,7 @@ func (s *Service) Serve() error { for { select { case <-s.stop: - break + return default: conn, err := ln.Accept() if err != nil { diff --git a/static_pool.go b/static_pool.go index b3e4f488..b974ac90 100644 --- a/static_pool.go +++ b/static_pool.go @@ -28,10 +28,10 @@ type StaticPool struct { // active task executions tasks sync.WaitGroup - // workers circular allocation buffer + // workers circular allocation buf free chan *Worker - // number of workers expected to be dead in a buffer. + // number of workers expected to be dead in a buf. numDead int64 // protects state of worker list, does not affect allocation @@ -40,7 +40,7 @@ type StaticPool struct { // all registered workers workers []*Worker - // pool is being destroying + // pool is being destroyed inDestroy int32 // lsn is optional callback to handle worker create/destruct/error events. @@ -83,6 +83,12 @@ func (p *StaticPool) Listen(l func(event int, ctx interface{})) { defer p.mul.Unlock() p.lsn = l + + p.muw.Lock() + for _, w := range p.workers { + w.err.Listen(p.lsn) + } + p.muw.Unlock() } // Config returns associated pool configuration. Immutable. @@ -138,14 +144,12 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { // Destroy all underlying workers (but let them to complete the task). func (p *StaticPool) Destroy() { atomic.AddInt32(&p.inDestroy, 1) - defer atomic.AddInt32(&p.inDestroy, -1) p.tasks.Wait() var wg sync.WaitGroup for _, w := range p.Workers() { wg.Add(1) - go w.Stop() go func(w *Worker) { defer wg.Done() p.destroyWorker(w, nil) @@ -207,6 +211,12 @@ func (p *StaticPool) createWorker() (*Worker, error) { return nil, err } + p.mul.Lock() + if p.lsn != nil { + w.err.Listen(p.lsn) + } + p.mul.Unlock() + p.throw(EventWorkerConstruct, w) p.muw.Lock() @@ -259,7 +269,7 @@ func (p *StaticPool) watchWorker(w *Worker) { p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) } - if !p.destroying() { + if !p.destroyed() { nw, err := p.createWorker() if err == nil { p.free <- nw @@ -275,7 +285,7 @@ func (p *StaticPool) watchWorker(w *Worker) { } } -func (p *StaticPool) destroying() bool { +func (p *StaticPool) destroyed() bool { return atomic.LoadInt32(&p.inDestroy) != 0 } @@ -1,7 +1,6 @@ package roadrunner import ( - "bytes" "fmt" "github.com/pkg/errors" "github.com/spiral/goridge" @@ -24,7 +23,7 @@ type Worker struct { Created time.Time // state holds information about current worker state, - // number of worker executions, last status change time. + // number of worker executions, buf status change time. // publicly this object is receive-only and protected using Mutex // and atomic counter. state *state @@ -60,7 +59,7 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) { w := &Worker{ Created: time.Now(), cmd: cmd, - err: &errBuffer{buffer: new(bytes.Buffer)}, + err: newErrBuffer(), waitDone: make(chan interface{}), state: newState(StateInactive), } @@ -212,6 +211,8 @@ func (w *Worker) start() error { if w.rl != nil { w.rl.Close() } + + w.err.Close() } }() |