summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--payload.go27
-rw-r--r--service/container_test.go2
2 files changed, 27 insertions, 2 deletions
diff --git a/payload.go b/payload.go
index 4b5003e4..9e1366df 100644
--- a/payload.go
+++ b/payload.go
@@ -1,5 +1,9 @@
package roadrunner
+import (
+ "bufio"
+)
+
// Payload carries binary header and body to workers and
// back to the server.
type Payload struct {
@@ -9,10 +13,31 @@ type Payload struct {
// body contains binary payload to be processed by worker.
Body []byte
- // add io.Reader support for streamed requests and responses.
+ // attached when worker responds with the stream
+ stream *bufio.Reader
+
+ // close callback will be called when payload is closed
+ cc func()
}
// String returns payload body as string
func (p *Payload) String() string {
return string(p.Body)
}
+
+// Stream returns true is payload is streaming.
+func (p *Payload) Stream() bool {
+ return p.stream != nil
+}
+
+// Stream returns associated stream.
+func (p *Payload) Read(d []byte) (n int, err error) {
+ return p.stream.Read(d)
+}
+
+// Close closes underlying stream and notifies stream end watchers.
+func (p *Payload) Close() {
+ if p.cc != nil {
+ p.cc()
+ }
+}
diff --git a/service/container_test.go b/service/container_test.go
index 8eaf13d4..ecc89160 100644
--- a/service/container_test.go
+++ b/service/container_test.go
@@ -57,11 +57,11 @@ func (t *testService) Stop() {
t.mu.Lock()
defer t.mu.Unlock()
+ close(t.serving)
if t.serving == nil {
return
}
- close(t.serving)
t.serving = nil
}