diff options
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 14 |
1 files changed, 6 insertions, 8 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index cf21fffa..6ddd609c 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -14,8 +14,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -80,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server + // attach default driver + p.pubsubs["memory"] = memory.NewInMemory(p.log) + return nil } @@ -91,11 +94,6 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() - // attach default driver - if len(p.pubsubs) == 0 { - p.pubsubs["memory"] = memory.NewInMemory(p.log) - } - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, @@ -307,7 +305,7 @@ func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - msg := &message.Message{} + msg := &websocketsv1.Message{} err := proto.Unmarshal(m, msg) if err != nil { return err @@ -331,7 +329,7 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - msg := &message.Message{} + msg := &websocketsv1.Message{} err := proto.Unmarshal(m, msg) if err != nil { p.log.Error("message unmarshal") |