summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-08 22:04:28 +0300
committerValery Piashchynski <[email protected]>2021-06-08 22:04:28 +0300
commitcc271dceb13d3929f0382311dfce3dfed2ce04ce (patch)
tree13c4c3f380d8309b95c9600cc2000d1d5ab87cda /plugins/websockets/plugin.go
parenta8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (diff)
- Add protobuf versioning
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go14
1 files changed, 6 insertions, 8 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index cf21fffa..6ddd609c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -80,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
+ // attach default driver
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+
return nil
}
@@ -91,11 +94,6 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
- // attach default driver
- if len(p.pubsubs) == 0 {
- p.pubsubs["memory"] = memory.NewInMemory(p.log)
- }
-
p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
@@ -307,7 +305,7 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
return err
@@ -331,7 +329,7 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
p.log.Error("message unmarshal")