summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go6
-rw-r--r--plugins/kv/drivers/memcached/driver.go6
-rw-r--r--plugins/kv/drivers/memory/driver.go18
-rw-r--r--plugins/kv/drivers/redis/driver.go6
-rw-r--r--plugins/kv/interface.go6
-rw-r--r--plugins/kv/payload.proto17
-rw-r--r--plugins/kv/payload/payload.pb.go234
-rw-r--r--plugins/kv/rpc.go14
-rw-r--r--plugins/redis/fanin.go10
-rw-r--r--plugins/redis/plugin.go8
-rw-r--r--plugins/websockets/executor/executor.go4
-rw-r--r--plugins/websockets/memory/inMemory.go6
-rw-r--r--plugins/websockets/plugin.go14
-rw-r--r--plugins/websockets/pool/workers_pool.go8
-rw-r--r--plugins/websockets/rpc.go6
15 files changed, 55 insertions, 308 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index af107dff..ba873513 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -10,9 +10,9 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
@@ -214,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -324,7 +324,7 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 9c4689c4..8ea515d0 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,9 +6,9 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -135,7 +135,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -176,7 +176,7 @@ func (d *Driver) Set(items ...*payload.Item) error {
// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
if items[i] == nil {
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index fae2c831..3158adee 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -6,9 +6,9 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -72,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(*payload.Item).Value), nil
+ return utils.AsBytes(data.(*kvv1.Item).Value), nil
}
return nil, nil
}
@@ -95,14 +95,14 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(*payload.Item).Value
+ m[keys[i]] = value.(*kvv1.Item).Value
}
}
return m, nil
}
-func (s *Driver) Set(items ...*payload.Item) error {
+func (s *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -128,7 +128,7 @@ func (s *Driver) Set(items ...*payload.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...*payload.Item) error {
+func (s *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
if items[i] == nil {
@@ -145,11 +145,11 @@ func (s *Driver) MExpire(items ...*payload.Item) error {
if err != nil {
return errors.E(op, err)
}
- tmp := pItem.(*payload.Item)
+ tmp := pItem.(*kvv1.Item)
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, &payload.Item{
+ s.heap.Store(items[i].Key, &kvv1.Item{
Key: items[i].Key,
Value: tmp.Value,
Timeout: items[i].Timeout,
@@ -178,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(*payload.Item).Timeout
+ m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
@@ -216,7 +216,7 @@ func (s *Driver) gc() {
case now := <-ticker.C:
// check every second
s.heap.Range(func(key, value interface{}) bool {
- v := value.(*payload.Item)
+ v := value.(*kvv1.Item)
if v.Timeout == "" {
return true
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index 5890367b..0aaa6352 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -7,9 +7,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -138,7 +138,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -187,7 +187,7 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire https://redis.io/commands/expire
// timeout in RFC3339
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_mexpire")
now := time.Now()
for _, item := range items {
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index abcd7f47..7841f9a2 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,6 +1,6 @@
package kv
-import "github.com/spiral/roadrunner/v2/plugins/kv/payload"
+import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -16,10 +16,10 @@ type Storage interface {
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(items ...*payload.Item) error
+ Set(items ...*kvv1.Item) error
// MExpire sets the TTL for multiply keys
- MExpire(items ...*payload.Item) error
+ MExpire(items ...*kvv1.Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
diff --git a/plugins/kv/payload.proto b/plugins/kv/payload.proto
deleted file mode 100644
index 66a22ee4..00000000
--- a/plugins/kv/payload.proto
+++ /dev/null
@@ -1,17 +0,0 @@
-syntax = "proto3";
-
-package kv.v1;
-option go_package = "./payload";
-
-message Payload {
- // could be an enum in the future
- string storage = 1;
- repeated Item items = 2;
-}
-
-message Item {
- string key = 1;
- string value = 2;
- // RFC 3339
- string timeout = 3;
-}
diff --git a/plugins/kv/payload/payload.pb.go b/plugins/kv/payload/payload.pb.go
deleted file mode 100644
index 60ad9f13..00000000
--- a/plugins/kv/payload/payload.pb.go
+++ /dev/null
@@ -1,234 +0,0 @@
-// Code generated by protoc-gen-go. DO NOT EDIT.
-// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.16.0
-// source: payload.proto
-
-package payload
-
-import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
- reflect "reflect"
- sync "sync"
-)
-
-const (
- // Verify that this generated code is sufficiently up-to-date.
- _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
- // Verify that runtime/protoimpl is sufficiently up-to-date.
- _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
-)
-
-type Payload struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- // could be an enum in the future
- Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"`
- Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
-}
-
-func (x *Payload) Reset() {
- *x = Payload{}
- if protoimpl.UnsafeEnabled {
- mi := &file_payload_proto_msgTypes[0]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *Payload) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*Payload) ProtoMessage() {}
-
-func (x *Payload) ProtoReflect() protoreflect.Message {
- mi := &file_payload_proto_msgTypes[0]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use Payload.ProtoReflect.Descriptor instead.
-func (*Payload) Descriptor() ([]byte, []int) {
- return file_payload_proto_rawDescGZIP(), []int{0}
-}
-
-func (x *Payload) GetStorage() string {
- if x != nil {
- return x.Storage
- }
- return ""
-}
-
-func (x *Payload) GetItems() []*Item {
- if x != nil {
- return x.Items
- }
- return nil
-}
-
-type Item struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
- Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
- Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"`
-}
-
-func (x *Item) Reset() {
- *x = Item{}
- if protoimpl.UnsafeEnabled {
- mi := &file_payload_proto_msgTypes[1]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *Item) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*Item) ProtoMessage() {}
-
-func (x *Item) ProtoReflect() protoreflect.Message {
- mi := &file_payload_proto_msgTypes[1]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use Item.ProtoReflect.Descriptor instead.
-func (*Item) Descriptor() ([]byte, []int) {
- return file_payload_proto_rawDescGZIP(), []int{1}
-}
-
-func (x *Item) GetKey() string {
- if x != nil {
- return x.Key
- }
- return ""
-}
-
-func (x *Item) GetValue() string {
- if x != nil {
- return x.Value
- }
- return ""
-}
-
-func (x *Item) GetTimeout() string {
- if x != nil {
- return x.Timeout
- }
- return ""
-}
-
-var File_payload_proto protoreflect.FileDescriptor
-
-var file_payload_proto_rawDesc = []byte{
- 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
- 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18,
- 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d,
- 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65,
- 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d,
- 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
- 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
- 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65,
- 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
- 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62,
- 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
-}
-
-var (
- file_payload_proto_rawDescOnce sync.Once
- file_payload_proto_rawDescData = file_payload_proto_rawDesc
-)
-
-func file_payload_proto_rawDescGZIP() []byte {
- file_payload_proto_rawDescOnce.Do(func() {
- file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData)
- })
- return file_payload_proto_rawDescData
-}
-
-var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
-var file_payload_proto_goTypes = []interface{}{
- (*Payload)(nil), // 0: kv.Payload
- (*Item)(nil), // 1: kv.Item
-}
-var file_payload_proto_depIdxs = []int32{
- 1, // 0: kv.Payload.items:type_name -> kv.Item
- 1, // [1:1] is the sub-list for method output_type
- 1, // [1:1] is the sub-list for method input_type
- 1, // [1:1] is the sub-list for extension type_name
- 1, // [1:1] is the sub-list for extension extendee
- 0, // [0:1] is the sub-list for field type_name
-}
-
-func init() { file_payload_proto_init() }
-func file_payload_proto_init() {
- if File_payload_proto != nil {
- return
- }
- if !protoimpl.UnsafeEnabled {
- file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Payload); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Item); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- }
- type x struct{}
- out := protoimpl.TypeBuilder{
- File: protoimpl.DescBuilder{
- GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
- RawDescriptor: file_payload_proto_rawDesc,
- NumEnums: 0,
- NumMessages: 2,
- NumExtensions: 0,
- NumServices: 0,
- },
- GoTypes: file_payload_proto_goTypes,
- DependencyIndexes: file_payload_proto_depIdxs,
- MessageInfos: file_payload_proto_msgTypes,
- }.Build()
- File_payload_proto = out.File
- file_payload_proto_rawDesc = nil
- file_payload_proto_goTypes = nil
- file_payload_proto_depIdxs = nil
-}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 009a5c56..a9efe0c4 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,7 +2,7 @@ package kv
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -17,7 +17,7 @@ type rpc struct {
}
// Has accept []*payload.Payload proto payload with Storage and Item
-func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error {
+func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
const op = errors.Op("rpc_has")
if in.Storage == "" {
@@ -46,7 +46,7 @@ func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error {
}
// Set accept proto payload with Storage and Item
-func (r *rpc) Set(in *payload.Payload, ok *bool) error {
+func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_set")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -65,7 +65,7 @@ func (r *rpc) Set(in *payload.Payload, ok *bool) error {
}
// MGet accept proto payload with Storage and Item
-func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error {
+func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_mget")
keys := make([]string, 0, len(in.GetItems()))
@@ -89,7 +89,7 @@ func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error {
}
// MExpire accept proto payload with Storage and Item
-func (r *rpc) MExpire(in *payload.Payload, ok *bool) error {
+func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_mexpire")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -108,7 +108,7 @@ func (r *rpc) MExpire(in *payload.Payload, ok *bool) error {
}
// TTL accept proto payload with Storage and Item
-func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error {
+func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_ttl")
keys := make([]string, 0, len(in.GetItems()))
@@ -131,7 +131,7 @@ func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error {
}
// Delete accept proto payload with Storage and Item
-func (r *rpc) Delete(in *payload.Payload, ok *bool) error {
+func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rcp_delete")
keys := make([]string, 0, len(in.GetItems()))
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 76bef400..ac9ebcc2 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,7 +4,7 @@ import (
"context"
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
@@ -23,13 +23,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *message.Message
+ out chan *websocketsv1.Message
exit chan struct{}
}
func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *message.Message, 100)
+ out := make(chan *websocketsv1.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -67,7 +67,7 @@ func (fi *FanIn) read() {
return
}
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
if err != nil {
fi.log.Error("message unmarshal")
@@ -97,6 +97,6 @@ func (fi *FanIn) stop() error {
return nil
}
-func (fi *FanIn) consume() <-chan *message.Message {
+func (fi *FanIn) consume() <-chan *websocketsv1.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 695e7b08..47ffeb39 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
@@ -107,7 +107,7 @@ func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(msg, m)
if err != nil {
return errors.E(err)
@@ -126,7 +126,7 @@ func (p *Plugin) PublishAsync(msg []byte) {
go func() {
p.Lock()
defer p.Unlock()
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(msg, m)
if err != nil {
p.log.Error("message unmarshal error")
@@ -209,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
return <-p.fanin.consume(), nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 951c9a1a..e3d47166 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -8,8 +8,8 @@ import (
"github.com/fasthttp/websocket"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -64,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err = json.Unmarshal(data, msg)
if err != nil {
diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go
index deb927ed..cef28182 100644
--- a/plugins/websockets/memory/inMemory.go
+++ b/plugins/websockets/memory/inMemory.go
@@ -4,8 +4,8 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
@@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
@@ -76,7 +76,7 @@ func (p *Plugin) Next() (*message.Message, error) {
p.RLock()
defer p.RUnlock()
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(msg, m)
if err != nil {
return nil, err
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index cf21fffa..6ddd609c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -80,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
+ // attach default driver
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+
return nil
}
@@ -91,11 +94,6 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
- // attach default driver
- if len(p.pubsubs) == 0 {
- p.pubsubs["memory"] = memory.NewInMemory(p.log)
- }
-
p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
@@ -307,7 +305,7 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
return err
@@ -331,7 +329,7 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
p.log.Error("message unmarshal")
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index efafb2d3..1a7c6f8a 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,8 +4,8 @@ import (
"sync"
"github.com/fasthttp/websocket"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
)
@@ -16,7 +16,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan *message.Message
+ queue chan *websocketsv1.Message
exit chan struct{}
}
@@ -24,7 +24,7 @@ type WorkersPool struct {
func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *message.Message, 100),
+ queue: make(chan *websocketsv1.Message, 100),
storage: pubsubs,
log: log,
exit: make(chan struct{}),
@@ -42,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
return wp
}
-func (wp *WorkersPool) Queue(msg *message.Message) {
+func (wp *WorkersPool) Queue(msg *websocketsv1.Message) {
wp.queue <- msg
}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index ef44884a..00c1dd91 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -2,7 +2,7 @@ package websockets
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
@@ -15,7 +15,7 @@ type rpc struct {
// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(in *message.Messages, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("broadcast_publish")
// just return in case of nil message
@@ -47,7 +47,7 @@ func (r *rpc) Publish(in *message.Messages, ok *bool) error {
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(in *message.Messages, ok *bool) error {
+func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("publish_async")
// just return in case of nil message