summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-30 19:35:28 +0300
committerValery Piashchynski <[email protected]>2021-05-30 19:36:18 +0300
commit0ee91dc24d3e68706d89092c06b1c0d09dab0353 (patch)
treeb4699fc334ba7b13be475d54021d980960cff991 /plugins
parent8188b34646e29861071ab5270160a9665eb1832f (diff)
- Do not use response.writer in hijacked connection
- Remove atomics from the ws Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/channel/plugin.go9
-rw-r--r--plugins/http/plugin.go32
-rw-r--r--plugins/websockets/plugin.go12
3 files changed, 28 insertions, 25 deletions
diff --git a/plugins/channel/plugin.go b/plugins/channel/plugin.go
index 9afd1264..362dbc07 100644
--- a/plugins/channel/plugin.go
+++ b/plugins/channel/plugin.go
@@ -28,6 +28,15 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
+ // read from the channels on stop to prevent blocking
+ go func() {
+ for range p.fromCh {
+ }
+ }()
+ go func() {
+ for range p.toCh {
+ }
+ }()
return nil
}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 397de7ae..c4a1a83f 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -6,8 +6,8 @@ import (
"log"
"net/http"
"sync"
+ "time"
- "github.com/hashicorp/go-multierror"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
@@ -126,7 +126,6 @@ func (p *Plugin) logCallback(event interface{}) {
// Serve serves the svc.
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 2)
- go p.messages()
// run whole process in the goroutine
go func() {
// protect http initialization
@@ -169,9 +168,16 @@ func (p *Plugin) serve(errCh chan error) {
if p.cfg.EnableHTTP() {
if p.cfg.EnableH2C() {
- p.http = &http.Server{Handler: h2c.NewHandler(p, &http2.Server{}), ErrorLog: p.stdLog}
+ p.http = &http.Server{
+ Handler: h2c.NewHandler(p, &http2.Server{}),
+ ErrorLog: p.stdLog,
+ }
} else {
- p.http = &http.Server{Handler: p, ErrorLog: p.stdLog}
+ p.http = &http.Server{
+ Handler: p,
+ ErrorLog: p.stdLog,
+ ReadHeaderTimeout: time.Second,
+ }
}
}
@@ -210,6 +216,9 @@ func (p *Plugin) serve(errCh chan error) {
go func() {
p.serveFCGI(errCh)
}()
+
+ // read messages from the ws
+ go p.messages()
}
// Stop stops the http.
@@ -217,31 +226,24 @@ func (p *Plugin) Stop() error {
p.Lock()
defer p.Unlock()
- var err error
if p.fcgi != nil {
- err = p.fcgi.Shutdown(context.Background())
+ err := p.fcgi.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
p.log.Error("error shutting down the fcgi server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
}
}
if p.https != nil {
- err = p.https.Shutdown(context.Background())
+ err := p.https.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
p.log.Error("error shutting down the https server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
}
}
if p.http != nil {
- err = p.http.Shutdown(context.Background())
+ err := p.http.Close()
if err != nil && err != http.ErrServerClosed {
p.log.Error("error shutting down the http server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
}
}
@@ -250,7 +252,7 @@ func (p *Plugin) Stop() error {
p.pool.Destroy(context.Background())
}
- return err
+ return nil
}
// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index b3495e77..c51c7ca1 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -3,7 +3,6 @@ package websockets
import (
"net/http"
"sync"
- "sync/atomic"
"time"
"github.com/fasthttp/websocket"
@@ -40,7 +39,6 @@ type Plugin struct {
// GO workers pool
workersPool *pool.WorkersPool
- stopped uint64
hub channel.Hub
}
@@ -61,7 +59,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, channel channel.
p.storage = storage.NewStorage()
p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
p.hub = channel
- p.stopped = 0
return nil
}
@@ -87,7 +84,6 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- atomic.AddUint64(&p.stopped, 1)
p.workersPool.Stop()
return nil
}
@@ -123,11 +119,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
return
}
- if atomic.CompareAndSwapUint64(&p.stopped, 1, 1) {
- // plugin stopped
- return
- }
-
r = attributes.Init(r)
err := validator.NewValidator().AssertServerAccess(p.hub, r)
@@ -156,7 +147,8 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
// upgrade connection to websocket connection
_conn, err := upgraded.Upgrade(w, r, nil)
if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
+ // connection hijacked, do not use response.writer or request
+ p.log.Error("upgrade connection error", "error", err)
return
}