summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go17
-rw-r--r--plugins/kv/drivers/memcached/driver.go18
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/drivers/memory/driver.go43
-rw-r--r--plugins/kv/drivers/redis/driver.go20
-rw-r--r--plugins/kv/interface.go14
-rw-r--r--plugins/kv/payload.proto17
-rw-r--r--plugins/kv/payload/generated/Item.go67
-rw-r--r--plugins/kv/payload/generated/Payload.go71
-rw-r--r--plugins/kv/payload/payload.fbs14
-rw-r--r--plugins/kv/payload/payload.pb.go234
-rw-r--r--plugins/kv/rpc.go153
-rw-r--r--plugins/memory/plugin.go14
-rw-r--r--plugins/redis/fanin.go11
-rw-r--r--plugins/redis/plugin.go27
-rw-r--r--plugins/websockets/executor/executor.go3
-rw-r--r--plugins/websockets/plugin.go34
-rw-r--r--plugins/websockets/pool/workers_pool.go46
-rw-r--r--plugins/websockets/rpc.go91
19 files changed, 451 insertions, 445 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 0f647cb1..af107dff 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -12,6 +12,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
@@ -213,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*payload.Item) error {
const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -259,14 +260,14 @@ func (d *Driver) Set(items ...kv.Item) error {
// if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
// we do not need mutex here, since we use sync.Map
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check correctness of provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
// Store key TTL in the separate map
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
buf.Reset()
@@ -323,20 +324,20 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*payload.Item) error {
const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
return nil
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 02281ed5..9c4689c4 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -8,6 +8,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -134,14 +135,14 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*payload.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
- if items[i] == EmptyItem {
+ if items[i] == nil {
return errors.E(op, errors.EmptyItem)
}
@@ -154,9 +155,9 @@ func (d *Driver) Set(items ...kv.Item) error {
}
// add additional TTL in case of TTL isn't empty
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// verify the TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -175,15 +176,18 @@ func (d *Driver) Set(items ...kv.Item) error {
// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*payload.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index cde84f42..3997e0d4 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -9,8 +9,6 @@ import (
const PluginName = "memcached"
-var EmptyItem = kv.Item{}
-
type Plugin struct {
// config plugin
cfgPlugin config.Configurer
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index c2494ee7..fae2c831 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -8,6 +8,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -71,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(*payload.Item).Value), nil
}
return nil, nil
}
@@ -94,24 +95,27 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(kv.Item).Value
+ m[keys[i]] = value.(*payload.Item).Value
}
}
return m, nil
}
-func (s *Driver) Set(items ...kv.Item) error {
+func (s *Driver) Set(items ...*payload.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
+ if items[i] == nil {
+ continue
+ }
// TTL is set
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check the TTL in the item
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -124,28 +128,31 @@ func (s *Driver) Set(items ...kv.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...kv.Item) error {
+func (s *Driver) MExpire(items ...*payload.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.Load(items[i].Key); ok {
+ if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- tmp := pItem.(kv.Item)
+ tmp := pItem.(*payload.Item)
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, kv.Item{
- Key: items[i].Key,
- Value: tmp.Value,
- TTL: items[i].TTL,
+ s.heap.Store(items[i].Key, &payload.Item{
+ Key: items[i].Key,
+ Value: tmp.Value,
+ Timeout: items[i].Timeout,
})
}
}
@@ -171,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(kv.Item).TTL
+ m[keys[i]] = item.(*payload.Item).Timeout
}
}
return m, nil
@@ -209,12 +216,12 @@ func (s *Driver) gc() {
case now := <-ticker.C:
// check every second
s.heap.Range(func(key, value interface{}) bool {
- v := value.(kv.Item)
- if v.TTL == "" {
+ v := value.(*payload.Item)
+ if v.Timeout == "" {
return true
}
- t, err := time.Parse(time.RFC3339, v.TTL)
+ t, err := time.Parse(time.RFC3339, v.Timeout)
if err != nil {
return false
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index d0b541b2..5890367b 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -9,11 +9,10 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-var EmptyItem = kv.Item{}
-
type Driver struct {
universalClient redis.UniversalClient
log logger.Logger
@@ -139,24 +138,24 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*payload.Item) error {
const op = errors.Op("redis_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
now := time.Now()
for _, item := range items {
- if item == EmptyItem {
+ if item == nil {
return errors.E(op, errors.EmptyKey)
}
- if item.TTL == "" {
+ if item.Timeout == "" {
err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
if err != nil {
return err
}
} else {
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
@@ -188,15 +187,18 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire https://redis.io/commands/expire
// timeout in RFC3339
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*payload.Item) error {
const op = errors.Op("redis_driver_mexpire")
now := time.Now()
for _, item := range items {
- if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ if item == nil {
+ continue
+ }
+ if item.Timeout == "" || strings.TrimSpace(item.Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 20dbb8b3..abcd7f47 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,14 +1,6 @@
package kv
-// Item represents general storage item
-type Item struct {
- // Key of item
- Key string
- // Value of item
- Value string
- // live until time provided by TTL in RFC 3339 format
- TTL string
-}
+import "github.com/spiral/roadrunner/v2/plugins/kv/payload"
// Storage represents single abstract storage.
type Storage interface {
@@ -24,10 +16,10 @@ type Storage interface {
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(items ...Item) error
+ Set(items ...*payload.Item) error
// MExpire sets the TTL for multiply keys
- MExpire(items ...Item) error
+ MExpire(items ...*payload.Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
diff --git a/plugins/kv/payload.proto b/plugins/kv/payload.proto
new file mode 100644
index 00000000..66a22ee4
--- /dev/null
+++ b/plugins/kv/payload.proto
@@ -0,0 +1,17 @@
+syntax = "proto3";
+
+package kv.v1;
+option go_package = "./payload";
+
+message Payload {
+ // could be an enum in the future
+ string storage = 1;
+ repeated Item items = 2;
+}
+
+message Item {
+ string key = 1;
+ string value = 2;
+ // RFC 3339
+ string timeout = 3;
+}
diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go
deleted file mode 100644
index 61bd6024..00000000
--- a/plugins/kv/payload/generated/Item.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package generated
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Item struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Item{}
- x.Init(buf, n+offset)
- return x
-}
-
-func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Item) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Item) Key() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Item) Value() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Item) Timeout() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func ItemStart(builder *flatbuffers.Builder) {
- builder.StartObject(3)
-}
-func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0)
-}
-func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0)
-}
-func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0)
-}
-func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go
deleted file mode 100644
index a2c6cfdb..00000000
--- a/plugins/kv/payload/generated/Payload.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package generated
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Payload struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Payload{}
- x.Init(buf, n+offset)
- return x
-}
-
-func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Payload) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Payload) Storage() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Payload) Items(obj *Item, j int) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- x := rcv._tab.Vector(o)
- x += flatbuffers.UOffsetT(j) * 4
- x = rcv._tab.Indirect(x)
- obj.Init(rcv._tab.Bytes, x)
- return true
- }
- return false
-}
-
-func (rcv *Payload) ItemsLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func PayloadStart(builder *flatbuffers.Builder) {
- builder.StartObject(2)
-}
-func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0)
-}
-func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0)
-}
-func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(4, numElems, 4)
-}
-func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs
deleted file mode 100644
index 7e02c1a0..00000000
--- a/plugins/kv/payload/payload.fbs
+++ /dev/null
@@ -1,14 +0,0 @@
-namespace generated;
-
-table Payload {
- Storage:string;
- Items:[Item];
-}
-
-table Item {
- Key:string;
- Value:string;
- Timeout:string;
-}
-
-root_type Payload;
diff --git a/plugins/kv/payload/payload.pb.go b/plugins/kv/payload/payload.pb.go
new file mode 100644
index 00000000..60ad9f13
--- /dev/null
+++ b/plugins/kv/payload/payload.pb.go
@@ -0,0 +1,234 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.26.0
+// protoc v3.16.0
+// source: payload.proto
+
+package payload
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type Payload struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // could be an enum in the future
+ Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"`
+ Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
+}
+
+func (x *Payload) Reset() {
+ *x = Payload{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_payload_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Payload) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Payload) ProtoMessage() {}
+
+func (x *Payload) ProtoReflect() protoreflect.Message {
+ mi := &file_payload_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Payload.ProtoReflect.Descriptor instead.
+func (*Payload) Descriptor() ([]byte, []int) {
+ return file_payload_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Payload) GetStorage() string {
+ if x != nil {
+ return x.Storage
+ }
+ return ""
+}
+
+func (x *Payload) GetItems() []*Item {
+ if x != nil {
+ return x.Items
+ }
+ return nil
+}
+
+type Item struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+ Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+ Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"`
+}
+
+func (x *Item) Reset() {
+ *x = Item{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_payload_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Item) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Item) ProtoMessage() {}
+
+func (x *Item) ProtoReflect() protoreflect.Message {
+ mi := &file_payload_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Item.ProtoReflect.Descriptor instead.
+func (*Item) Descriptor() ([]byte, []int) {
+ return file_payload_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *Item) GetKey() string {
+ if x != nil {
+ return x.Key
+ }
+ return ""
+}
+
+func (x *Item) GetValue() string {
+ if x != nil {
+ return x.Value
+ }
+ return ""
+}
+
+func (x *Item) GetTimeout() string {
+ if x != nil {
+ return x.Timeout
+ }
+ return ""
+}
+
+var File_payload_proto protoreflect.FileDescriptor
+
+var file_payload_proto_rawDesc = []byte{
+ 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
+ 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18,
+ 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d,
+ 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65,
+ 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d,
+ 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
+ 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65,
+ 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
+ 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62,
+ 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_payload_proto_rawDescOnce sync.Once
+ file_payload_proto_rawDescData = file_payload_proto_rawDesc
+)
+
+func file_payload_proto_rawDescGZIP() []byte {
+ file_payload_proto_rawDescOnce.Do(func() {
+ file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData)
+ })
+ return file_payload_proto_rawDescData
+}
+
+var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_payload_proto_goTypes = []interface{}{
+ (*Payload)(nil), // 0: kv.Payload
+ (*Item)(nil), // 1: kv.Item
+}
+var file_payload_proto_depIdxs = []int32{
+ 1, // 0: kv.Payload.items:type_name -> kv.Item
+ 1, // [1:1] is the sub-list for method output_type
+ 1, // [1:1] is the sub-list for method input_type
+ 1, // [1:1] is the sub-list for extension type_name
+ 1, // [1:1] is the sub-list for extension extendee
+ 0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_payload_proto_init() }
+func file_payload_proto_init() {
+ if File_payload_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Payload); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Item); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_payload_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 2,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_payload_proto_goTypes,
+ DependencyIndexes: file_payload_proto_depIdxs,
+ MessageInfos: file_payload_proto_msgTypes,
+ }.Build()
+ File_payload_proto = out.File
+ file_payload_proto_rawDesc = nil
+ file_payload_proto_goTypes = nil
+ file_payload_proto_depIdxs = nil
+}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 2d4babbe..009a5c56 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,9 +2,8 @@ package kv
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
// Wrapper for the plugin
@@ -17,23 +16,21 @@ type rpc struct {
log logger.Logger
}
-// Has accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Has(in []byte, res *map[string]bool) error {
+// Has accept []*payload.Payload proto payload with Storage and Item
+func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error {
const op = errors.Op("rpc_has")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
+ if in.Storage == "" {
+ return errors.E(op, errors.Str("no storage provided"))
+ }
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, utils.AsString(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok {
+ if st, ok := r.storages[in.Storage]; ok {
ret, err := st.Has(keys...)
if err != nil {
return err
@@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Set accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Set(in []byte, ok *bool) error {
+// Set accept proto payload with Storage and Item
+func (r *rpc) Set(in *payload.Payload, ok *bool) error {
const op = errors.Op("rpc_set")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- items := make([]Item, 0, dataRoot.ItemsLength())
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- Value: string(tmpItem.Value()),
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.Set(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.Set(in.GetItems()...)
if err != nil {
return err
}
@@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MGet accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
+// MGet accept proto payload with Storage and Item
+func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_mget")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
+ keys := make([]string, 0, len(in.GetItems()))
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.MGet(keys...)
if err != nil {
return err
@@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MExpire accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MExpire(in []byte, ok *bool) error {
+// MExpire accept proto payload with Storage and Item
+func (r *rpc) MExpire(in *payload.Payload, ok *bool) error {
const op = errors.Op("rpc_mexpire")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- // when unmarshalling the keys, simultaneously, fill up the slice with items
- items := make([]Item, 0, l)
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- // we set up timeout on the keys, so, value here is redundant
- Value: "",
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.MExpire(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.MExpire(in.GetItems()...)
if err != nil {
return errors.E(op, err)
}
@@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// TTL accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
+// TTL accept proto payload with Storage and Item
+func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_ttl")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.TTL(keys...)
if err != nil {
return err
@@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Delete accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Delete(in []byte, ok *bool) error {
+// Delete accept proto payload with Storage and Item
+func (r *rpc) Delete(in *payload.Payload, ok *bool) error {
const op = errors.Op("rcp_delete")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
err := st.Delete(keys...)
if err != nil {
return errors.E(op, err)
@@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 6732ff5d..d724dff9 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -6,7 +6,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -86,15 +86,19 @@ func (p *Plugin) Next() (*message.Message, error) {
p.RLock()
defer p.RUnlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
+ m := &message.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return nil, err
+ }
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ for i := 0; i < len(m.GetTopics()); i++ {
// if we have active subscribers - send a message to a topic
// or send nil instead
- if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok {
- return fbsMsg, nil
+ if ok := p.storage.Contains(m.GetTopics()[i]); ok {
+ return m, nil
}
}
return nil, nil
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 321bfaaa..76bef400 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -6,6 +6,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
@@ -65,7 +66,15 @@ func (fi *FanIn) read() {
if !ok {
return
}
- fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
+
+ m := &message.Message{}
+ err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("message unmarshal")
+ continue
+ }
+
+ fi.out <- m
case <-fi.exit:
return
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index b2603a40..695e7b08 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -9,7 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const PluginName = "redis"
@@ -107,10 +107,14 @@ func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
+ m := &message.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return errors.E(err)
+ }
- for j := 0; j < fbsMsg.TopicsLength(); j++ {
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
if f.Err() != nil {
return f.Err()
}
@@ -122,12 +126,17 @@ func (p *Plugin) PublishAsync(msg []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
- for j := 0; j < fbsMsg.TopicsLength(); j++ {
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ m := &message.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ p.log.Error("message unmarshal error")
+ return
+ }
+
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
- return
+ p.log.Error("redis publish", "error", f.Err())
}
}
}()
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 69aad7d4..951c9a1a 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -9,6 +9,7 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -63,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- msg := &pubsub.Message{}
+ msg := &message.Message{}
err = json.Unmarshal(data, msg)
if err != nil {
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 4c0edcad..39a4e139 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -25,7 +25,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -301,16 +301,21 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
+ msg := &message.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return err
+ }
+
// Get payload
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
return nil
@@ -320,16 +325,21 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ msg := &message.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
p.log.Error("publish async error", "error", err)
- return
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
}()
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 544f3ede..efafb2d3 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -1,7 +1,6 @@
package pool
import (
- "bytes"
"sync"
"github.com/fasthttp/websocket"
@@ -9,14 +8,12 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
storage map[string]pubsub.PubSub
connections *sync.Map
resPool sync.Pool
- bPool sync.Pool
log logger.Logger
queue chan *message.Message
@@ -36,9 +33,6 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
- wp.bPool.New = func() interface{} {
- return new(bytes.Buffer)
- }
// start 10 workers
for i := 0; i < 50; i++ {
@@ -73,15 +67,6 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
-func (wp *WorkersPool) putBytes(b *bytes.Buffer) {
- b.Reset()
- wp.bPool.Put(b)
-}
-
-func (wp *WorkersPool) getBytes() *bytes.Buffer {
- return wp.bPool.Get().(*bytes.Buffer)
-}
-
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
@@ -94,29 +79,27 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if msg == nil {
continue
}
- if msg.TopicsLength() == 0 {
+ if len(msg.GetTopics()) == 0 {
continue
}
- br, ok := wp.storage[utils.AsString(msg.Broker())]
+ br, ok := wp.storage[msg.Broker]
if !ok {
- wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage)
+ wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
continue
}
res := wp.get()
- bb := wp.getBytes()
- for i := 0; i < msg.TopicsLength(); i++ {
+ for i := 0; i < len(msg.GetTopics()); i++ {
// get connections for the particular topic
- br.Connections(utils.AsString(msg.Topics(i)), res)
+ br.Connections(msg.GetTopics()[i], res)
}
if len(res) == 0 {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
}
- wp.putBytes(bb)
wp.put(res)
continue
}
@@ -124,8 +107,8 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
@@ -133,20 +116,15 @@ func (wp *WorkersPool) do() { //nolint:gocognit
conn := c.(*connection.Connection)
// put data into the bytes buffer
- for i := 0; i < msg.PayloadLength(); i++ {
- bb.WriteByte(byte(msg.Payload(i)))
- }
- err := conn.Write(websocket.BinaryMessage, bb.Bytes())
+ err := conn.Write(websocket.BinaryMessage, msg.GetPayload())
if err != nil {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
}
- // put bytes buffer back
- wp.putBytes(bb)
// put map with results back
wp.put(res)
case <-wp.exit:
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 6c2cacb4..ef44884a 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,10 +1,10 @@
package websockets
import (
- flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
)
// rpc collectors struct
@@ -13,39 +13,32 @@ type rpc struct {
log logger.Logger
}
-// Publish ... msg is a flatbuffers decoded payload
+// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(msg []byte, ok *bool) error {
+func (r *rpc) Publish(in *message.Messages, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
+ r.log.Debug("message published", "msg", in.Messages)
- b := flatbuffers.NewBuilder(100)
+ msgLen := len(in.GetMessages())
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
-
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
- err := r.plugin.Publish(bb)
+ err = r.plugin.Publish(bb)
if err != nil {
*ok = false
- b.Reset()
return errors.E(op, err)
}
- b.Reset()
}
*ok = true
@@ -54,68 +47,28 @@ func (r *rpc) Publish(msg []byte, ok *bool) error {
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
- r.log.Debug("message published", "msg", msg)
+func (r *rpc) PublishAsync(in *message.Messages, ok *bool) error {
+ const op = errors.Op("publish_async")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
-
- b := flatbuffers.NewBuilder(100)
+ r.log.Debug("message published", "msg", in.Messages)
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
+ msgLen := len(in.GetMessages())
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
r.plugin.PublishAsync(bb)
- b.Reset()
}
*ok = true
return nil
}
-
-func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
- cmdOff := b.CreateByteString(msg.Command())
- brokerOff := b.CreateByteString(msg.Broker())
-
- offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
- for j := msg.TopicsLength() - 1; j >= 0; j-- {
- offsets[j] = b.CreateByteString(msg.Topics(j))
- }
-
- message.MessageStartTopicsVector(b, len(offsets))
-
- for j := len(offsets) - 1; j >= 0; j-- {
- b.PrependUOffsetT(offsets[j])
- }
-
- tOff := b.EndVector(len(offsets))
- bb := make([]byte, msg.PayloadLength())
- for i := 0; i < msg.PayloadLength(); i++ {
- bb[i] = byte(msg.Payload(i))
- }
- pOff := b.CreateByteVector(bb)
-
- message.MessageStart(b)
-
- message.MessageAddCommand(b, cmdOff)
- message.MessageAddBroker(b, brokerOff)
- message.MessageAddTopics(b, tOff)
- message.MessageAddPayload(b, pOff)
-
- fOff := message.MessageEnd(b)
- b.Finish(fOff)
-
- return b.FinishedBytes()
-}