diff options
-rw-r--r-- | payload.go | 27 | ||||
-rw-r--r-- | service/container_test.go | 2 |
2 files changed, 27 insertions, 2 deletions
@@ -1,5 +1,9 @@ package roadrunner +import ( + "bufio" +) + // Payload carries binary header and body to workers and // back to the server. type Payload struct { @@ -9,10 +13,31 @@ type Payload struct { // body contains binary payload to be processed by worker. Body []byte - // add io.Reader support for streamed requests and responses. + // attached when worker responds with the stream + stream *bufio.Reader + + // close callback will be called when payload is closed + cc func() } // String returns payload body as string func (p *Payload) String() string { return string(p.Body) } + +// Stream returns true is payload is streaming. +func (p *Payload) Stream() bool { + return p.stream != nil +} + +// Stream returns associated stream. +func (p *Payload) Read(d []byte) (n int, err error) { + return p.stream.Read(d) +} + +// Close closes underlying stream and notifies stream end watchers. +func (p *Payload) Close() { + if p.cc != nil { + p.cc() + } +} diff --git a/service/container_test.go b/service/container_test.go index 8eaf13d4..ecc89160 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -57,11 +57,11 @@ func (t *testService) Stop() { t.mu.Lock() defer t.mu.Unlock() + close(t.serving) if t.serving == nil { return } - close(t.serving) t.serving = nil } |