diff options
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 2df23f11..395b056f 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -13,7 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/common/pubsub" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/state/process" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" @@ -58,6 +58,10 @@ type Plugin struct { // server which produces commands to the pool server server.Server + // stop receiving messages + cancel context.CancelFunc + ctx context.Context + // function used to validate access to the requested resource accessValidator validator.AccessValidatorFn } @@ -90,6 +94,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.server = server p.log = log p.broadcaster = b + + ctx, cancel := context.WithCancel(context.Background()) + p.ctx = ctx + p.cancel = cancel return nil } @@ -118,7 +126,8 @@ func (p *Plugin) Serve() chan error { Supervisor: p.cfg.Pool.Supervisor, }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path}) if err != nil { - errCh <- err + errCh <- errors.E(op, err) + return } p.accessValidator = p.defaultAccessValidator(p.phpPool) @@ -129,17 +138,17 @@ func (p *Plugin) Serve() chan error { // we need here only Reader part of the interface go func(ps pubsub.Reader) { for { - select { - case <-p.serveExit: - return - default: - data, err := ps.Next() - if err != nil { - errCh <- err + data, err := ps.Next(p.ctx) + if err != nil { + if errors.Is(errors.TimeOut, err) { return } - p.workersPool.Queue(data) + + errCh <- errors.E(op, err) + return } + + p.workersPool.Queue(data) } }(p.subReader) @@ -149,6 +158,8 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() + // cancel context + p.cancel() p.Lock() if p.phpPool == nil { p.Unlock() |