diff options
author | Valery Piashchynski <[email protected]> | 2021-06-08 22:04:28 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-08 22:04:28 +0300 |
commit | cc271dceb13d3929f0382311dfce3dfed2ce04ce (patch) | |
tree | 13c4c3f380d8309b95c9600cc2000d1d5ab87cda /plugins/websockets | |
parent | a8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (diff) |
- Add protobuf versioning
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/executor/executor.go | 4 | ||||
-rw-r--r-- | plugins/websockets/memory/inMemory.go | 6 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 14 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 8 | ||||
-rw-r--r-- | plugins/websockets/rpc.go | 6 |
5 files changed, 18 insertions, 20 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 951c9a1a..e3d47166 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -8,8 +8,8 @@ import ( "github.com/fasthttp/websocket" json "github.com/json-iterator/go" "github.com/spiral/errors" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" @@ -64,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit return errors.E(op, err) } - msg := &message.Message{} + msg := &websocketsv1.Message{} err = json.Unmarshal(data, msg) if err != nil { diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go index deb927ed..cef28182 100644 --- a/plugins/websockets/memory/inMemory.go +++ b/plugins/websockets/memory/inMemory.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/spiral/roadrunner/v2/pkg/bst" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) @@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } } -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil @@ -76,7 +76,7 @@ func (p *Plugin) Next() (*message.Message, error) { p.RLock() defer p.RUnlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { return nil, err diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index cf21fffa..6ddd609c 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -14,8 +14,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -80,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server + // attach default driver + p.pubsubs["memory"] = memory.NewInMemory(p.log) + return nil } @@ -91,11 +94,6 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() - // attach default driver - if len(p.pubsubs) == 0 { - p.pubsubs["memory"] = memory.NewInMemory(p.log) - } - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, @@ -307,7 +305,7 @@ func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - msg := &message.Message{} + msg := &websocketsv1.Message{} err := proto.Unmarshal(m, msg) if err != nil { return err @@ -331,7 +329,7 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - msg := &message.Message{} + msg := &websocketsv1.Message{} err := proto.Unmarshal(m, msg) if err != nil { p.log.Error("message unmarshal") diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index efafb2d3..1a7c6f8a 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/fasthttp/websocket" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" ) @@ -16,7 +16,7 @@ type WorkersPool struct { resPool sync.Pool log logger.Logger - queue chan *message.Message + queue chan *websocketsv1.Message exit chan struct{} } @@ -24,7 +24,7 @@ type WorkersPool struct { func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *message.Message, 100), + queue: make(chan *websocketsv1.Message, 100), storage: pubsubs, log: log, exit: make(chan struct{}), @@ -42,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log return wp } -func (wp *WorkersPool) Queue(msg *message.Message) { +func (wp *WorkersPool) Queue(msg *websocketsv1.Message) { wp.queue <- msg } diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index ef44884a..00c1dd91 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -2,7 +2,7 @@ package websockets import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) @@ -15,7 +15,7 @@ type rpc struct { // Publish ... msg is a proto decoded payload // see: pkg/pubsub/message.fbs -func (r *rpc) Publish(in *message.Messages, ok *bool) error { +func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error { const op = errors.Op("broadcast_publish") // just return in case of nil message @@ -47,7 +47,7 @@ func (r *rpc) Publish(in *message.Messages, ok *bool) error { // PublishAsync ... // see: pkg/pubsub/message.fbs -func (r *rpc) PublishAsync(in *message.Messages, ok *bool) error { +func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error { const op = errors.Op("publish_async") // just return in case of nil message |