diff options
author | Valery Piashchynski <[email protected]> | 2021-05-30 19:35:28 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-30 19:36:18 +0300 |
commit | 0ee91dc24d3e68706d89092c06b1c0d09dab0353 (patch) | |
tree | b4699fc334ba7b13be475d54021d980960cff991 /plugins | |
parent | 8188b34646e29861071ab5270160a9665eb1832f (diff) |
- Do not use response.writer in hijacked connection
- Remove atomics from the ws
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/channel/plugin.go | 9 | ||||
-rw-r--r-- | plugins/http/plugin.go | 32 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 12 |
3 files changed, 28 insertions, 25 deletions
diff --git a/plugins/channel/plugin.go b/plugins/channel/plugin.go index 9afd1264..362dbc07 100644 --- a/plugins/channel/plugin.go +++ b/plugins/channel/plugin.go @@ -28,6 +28,15 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { + // read from the channels on stop to prevent blocking + go func() { + for range p.fromCh { + } + }() + go func() { + for range p.toCh { + } + }() return nil } diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 397de7ae..c4a1a83f 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -6,8 +6,8 @@ import ( "log" "net/http" "sync" + "time" - "github.com/hashicorp/go-multierror" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pool" @@ -126,7 +126,6 @@ func (p *Plugin) logCallback(event interface{}) { // Serve serves the svc. func (p *Plugin) Serve() chan error { errCh := make(chan error, 2) - go p.messages() // run whole process in the goroutine go func() { // protect http initialization @@ -169,9 +168,16 @@ func (p *Plugin) serve(errCh chan error) { if p.cfg.EnableHTTP() { if p.cfg.EnableH2C() { - p.http = &http.Server{Handler: h2c.NewHandler(p, &http2.Server{}), ErrorLog: p.stdLog} + p.http = &http.Server{ + Handler: h2c.NewHandler(p, &http2.Server{}), + ErrorLog: p.stdLog, + } } else { - p.http = &http.Server{Handler: p, ErrorLog: p.stdLog} + p.http = &http.Server{ + Handler: p, + ErrorLog: p.stdLog, + ReadHeaderTimeout: time.Second, + } } } @@ -210,6 +216,9 @@ func (p *Plugin) serve(errCh chan error) { go func() { p.serveFCGI(errCh) }() + + // read messages from the ws + go p.messages() } // Stop stops the http. @@ -217,31 +226,24 @@ func (p *Plugin) Stop() error { p.Lock() defer p.Unlock() - var err error if p.fcgi != nil { - err = p.fcgi.Shutdown(context.Background()) + err := p.fcgi.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { p.log.Error("error shutting down the fcgi server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) } } if p.https != nil { - err = p.https.Shutdown(context.Background()) + err := p.https.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { p.log.Error("error shutting down the https server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) } } if p.http != nil { - err = p.http.Shutdown(context.Background()) + err := p.http.Close() if err != nil && err != http.ErrServerClosed { p.log.Error("error shutting down the http server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) } } @@ -250,7 +252,7 @@ func (p *Plugin) Stop() error { p.pool.Destroy(context.Background()) } - return err + return nil } // ServeHTTP handles connection using set of middleware and pool PSR-7 server. diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index b3495e77..c51c7ca1 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -3,7 +3,6 @@ package websockets import ( "net/http" "sync" - "sync/atomic" "time" "github.com/fasthttp/websocket" @@ -40,7 +39,6 @@ type Plugin struct { // GO workers pool workersPool *pool.WorkersPool - stopped uint64 hub channel.Hub } @@ -61,7 +59,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, channel channel. p.storage = storage.NewStorage() p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log) p.hub = channel - p.stopped = 0 return nil } @@ -87,7 +84,6 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - atomic.AddUint64(&p.stopped, 1) p.workersPool.Stop() return nil } @@ -123,11 +119,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { return } - if atomic.CompareAndSwapUint64(&p.stopped, 1, 1) { - // plugin stopped - return - } - r = attributes.Init(r) err := validator.NewValidator().AssertServerAccess(p.hub, r) @@ -156,7 +147,8 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { // upgrade connection to websocket connection _conn, err := upgraded.Upgrade(w, r, nil) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + // connection hijacked, do not use response.writer or request + p.log.Error("upgrade connection error", "error", err) return } |