summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-08 22:04:28 +0300
committerValery Piashchynski <[email protected]>2021-06-08 22:04:28 +0300
commitcc271dceb13d3929f0382311dfce3dfed2ce04ce (patch)
tree13c4c3f380d8309b95c9600cc2000d1d5ab87cda /plugins/websockets
parenta8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (diff)
- Add protobuf versioning
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/executor/executor.go4
-rw-r--r--plugins/websockets/memory/inMemory.go6
-rw-r--r--plugins/websockets/plugin.go14
-rw-r--r--plugins/websockets/pool/workers_pool.go8
-rw-r--r--plugins/websockets/rpc.go6
5 files changed, 18 insertions, 20 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 951c9a1a..e3d47166 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -8,8 +8,8 @@ import (
"github.com/fasthttp/websocket"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -64,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err = json.Unmarshal(data, msg)
if err != nil {
diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go
index deb927ed..cef28182 100644
--- a/plugins/websockets/memory/inMemory.go
+++ b/plugins/websockets/memory/inMemory.go
@@ -4,8 +4,8 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
@@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
@@ -76,7 +76,7 @@ func (p *Plugin) Next() (*message.Message, error) {
p.RLock()
defer p.RUnlock()
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(msg, m)
if err != nil {
return nil, err
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index cf21fffa..6ddd609c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -80,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
+ // attach default driver
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+
return nil
}
@@ -91,11 +94,6 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
- // attach default driver
- if len(p.pubsubs) == 0 {
- p.pubsubs["memory"] = memory.NewInMemory(p.log)
- }
-
p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
@@ -307,7 +305,7 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
return err
@@ -331,7 +329,7 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
p.log.Error("message unmarshal")
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index efafb2d3..1a7c6f8a 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,8 +4,8 @@ import (
"sync"
"github.com/fasthttp/websocket"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
)
@@ -16,7 +16,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan *message.Message
+ queue chan *websocketsv1.Message
exit chan struct{}
}
@@ -24,7 +24,7 @@ type WorkersPool struct {
func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *message.Message, 100),
+ queue: make(chan *websocketsv1.Message, 100),
storage: pubsubs,
log: log,
exit: make(chan struct{}),
@@ -42,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
return wp
}
-func (wp *WorkersPool) Queue(msg *message.Message) {
+func (wp *WorkersPool) Queue(msg *websocketsv1.Message) {
wp.queue <- msg
}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index ef44884a..00c1dd91 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -2,7 +2,7 @@ package websockets
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
@@ -15,7 +15,7 @@ type rpc struct {
// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(in *message.Messages, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("broadcast_publish")
// just return in case of nil message
@@ -47,7 +47,7 @@ func (r *rpc) Publish(in *message.Messages, ok *bool) error {
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(in *message.Messages, ok *bool) error {
+func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("publish_async")
// just return in case of nil message