summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go17
-rw-r--r--plugins/kv/drivers/memcached/driver.go18
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/drivers/memory/driver.go43
-rw-r--r--plugins/kv/drivers/redis/driver.go20
-rw-r--r--plugins/kv/interface.go14
-rw-r--r--plugins/kv/payload/generated/Item.go67
-rw-r--r--plugins/kv/payload/generated/Payload.go71
-rw-r--r--plugins/kv/payload/payload.fbs14
-rw-r--r--plugins/kv/rpc.go153
-rw-r--r--plugins/redis/fanin.go19
-rw-r--r--plugins/redis/plugin.go31
-rw-r--r--plugins/websockets/executor/executor.go3
-rw-r--r--plugins/websockets/memory/inMemory.go (renamed from plugins/memory/plugin.go)42
-rw-r--r--plugins/websockets/plugin.go40
-rw-r--r--plugins/websockets/pool/workers_pool.go54
-rw-r--r--plugins/websockets/rpc.go93
17 files changed, 225 insertions, 476 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 0f647cb1..ba873513 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -213,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -259,14 +260,14 @@ func (d *Driver) Set(items ...kv.Item) error {
// if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
// we do not need mutex here, since we use sync.Map
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check correctness of provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
// Store key TTL in the separate map
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
buf.Reset()
@@ -323,20 +324,20 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
return nil
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 02281ed5..8ea515d0 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,6 +6,7 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -134,14 +135,14 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
- if items[i] == EmptyItem {
+ if items[i] == nil {
return errors.E(op, errors.EmptyItem)
}
@@ -154,9 +155,9 @@ func (d *Driver) Set(items ...kv.Item) error {
}
// add additional TTL in case of TTL isn't empty
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// verify the TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -175,15 +176,18 @@ func (d *Driver) Set(items ...kv.Item) error {
// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index cde84f42..3997e0d4 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -9,8 +9,6 @@ import (
const PluginName = "memcached"
-var EmptyItem = kv.Item{}
-
type Plugin struct {
// config plugin
cfgPlugin config.Configurer
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index c2494ee7..3158adee 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -71,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(*kvv1.Item).Value), nil
}
return nil, nil
}
@@ -94,24 +95,27 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(kv.Item).Value
+ m[keys[i]] = value.(*kvv1.Item).Value
}
}
return m, nil
}
-func (s *Driver) Set(items ...kv.Item) error {
+func (s *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
+ if items[i] == nil {
+ continue
+ }
// TTL is set
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check the TTL in the item
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -124,28 +128,31 @@ func (s *Driver) Set(items ...kv.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...kv.Item) error {
+func (s *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.Load(items[i].Key); ok {
+ if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- tmp := pItem.(kv.Item)
+ tmp := pItem.(*kvv1.Item)
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, kv.Item{
- Key: items[i].Key,
- Value: tmp.Value,
- TTL: items[i].TTL,
+ s.heap.Store(items[i].Key, &kvv1.Item{
+ Key: items[i].Key,
+ Value: tmp.Value,
+ Timeout: items[i].Timeout,
})
}
}
@@ -171,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(kv.Item).TTL
+ m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
@@ -209,12 +216,12 @@ func (s *Driver) gc() {
case now := <-ticker.C:
// check every second
s.heap.Range(func(key, value interface{}) bool {
- v := value.(kv.Item)
- if v.TTL == "" {
+ v := value.(*kvv1.Item)
+ if v.Timeout == "" {
return true
}
- t, err := time.Parse(time.RFC3339, v.TTL)
+ t, err := time.Parse(time.RFC3339, v.Timeout)
if err != nil {
return false
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index d0b541b2..0aaa6352 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -7,13 +7,12 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-var EmptyItem = kv.Item{}
-
type Driver struct {
universalClient redis.UniversalClient
log logger.Logger
@@ -139,24 +138,24 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
now := time.Now()
for _, item := range items {
- if item == EmptyItem {
+ if item == nil {
return errors.E(op, errors.EmptyKey)
}
- if item.TTL == "" {
+ if item.Timeout == "" {
err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
if err != nil {
return err
}
} else {
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
@@ -188,15 +187,18 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire https://redis.io/commands/expire
// timeout in RFC3339
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_mexpire")
now := time.Now()
for _, item := range items {
- if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ if item == nil {
+ continue
+ }
+ if item.Timeout == "" || strings.TrimSpace(item.Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 20dbb8b3..7841f9a2 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,14 +1,6 @@
package kv
-// Item represents general storage item
-type Item struct {
- // Key of item
- Key string
- // Value of item
- Value string
- // live until time provided by TTL in RFC 3339 format
- TTL string
-}
+import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -24,10 +16,10 @@ type Storage interface {
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(items ...Item) error
+ Set(items ...*kvv1.Item) error
// MExpire sets the TTL for multiply keys
- MExpire(items ...Item) error
+ MExpire(items ...*kvv1.Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go
deleted file mode 100644
index 61bd6024..00000000
--- a/plugins/kv/payload/generated/Item.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package generated
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Item struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Item{}
- x.Init(buf, n+offset)
- return x
-}
-
-func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Item) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Item) Key() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Item) Value() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Item) Timeout() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func ItemStart(builder *flatbuffers.Builder) {
- builder.StartObject(3)
-}
-func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0)
-}
-func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0)
-}
-func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0)
-}
-func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go
deleted file mode 100644
index a2c6cfdb..00000000
--- a/plugins/kv/payload/generated/Payload.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package generated
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Payload struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Payload{}
- x.Init(buf, n+offset)
- return x
-}
-
-func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Payload) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Payload) Storage() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Payload) Items(obj *Item, j int) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- x := rcv._tab.Vector(o)
- x += flatbuffers.UOffsetT(j) * 4
- x = rcv._tab.Indirect(x)
- obj.Init(rcv._tab.Bytes, x)
- return true
- }
- return false
-}
-
-func (rcv *Payload) ItemsLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func PayloadStart(builder *flatbuffers.Builder) {
- builder.StartObject(2)
-}
-func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0)
-}
-func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0)
-}
-func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(4, numElems, 4)
-}
-func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs
deleted file mode 100644
index 7e02c1a0..00000000
--- a/plugins/kv/payload/payload.fbs
+++ /dev/null
@@ -1,14 +0,0 @@
-namespace generated;
-
-table Payload {
- Storage:string;
- Items:[Item];
-}
-
-table Item {
- Key:string;
- Value:string;
- Timeout:string;
-}
-
-root_type Payload;
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 2d4babbe..a9efe0c4 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,9 +2,8 @@ package kv
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
// Wrapper for the plugin
@@ -17,23 +16,21 @@ type rpc struct {
log logger.Logger
}
-// Has accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Has(in []byte, res *map[string]bool) error {
+// Has accept []*payload.Payload proto payload with Storage and Item
+func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
const op = errors.Op("rpc_has")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
+ if in.Storage == "" {
+ return errors.E(op, errors.Str("no storage provided"))
+ }
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, utils.AsString(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok {
+ if st, ok := r.storages[in.Storage]; ok {
ret, err := st.Has(keys...)
if err != nil {
return err
@@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Set accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Set(in []byte, ok *bool) error {
+// Set accept proto payload with Storage and Item
+func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_set")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- items := make([]Item, 0, dataRoot.ItemsLength())
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- Value: string(tmpItem.Value()),
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.Set(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.Set(in.GetItems()...)
if err != nil {
return err
}
@@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MGet accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
+// MGet accept proto payload with Storage and Item
+func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_mget")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
+ keys := make([]string, 0, len(in.GetItems()))
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.MGet(keys...)
if err != nil {
return err
@@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MExpire accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MExpire(in []byte, ok *bool) error {
+// MExpire accept proto payload with Storage and Item
+func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_mexpire")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- // when unmarshalling the keys, simultaneously, fill up the slice with items
- items := make([]Item, 0, l)
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- // we set up timeout on the keys, so, value here is redundant
- Value: "",
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.MExpire(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.MExpire(in.GetItems()...)
if err != nil {
return errors.E(op, err)
}
@@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// TTL accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
+// TTL accept proto payload with Storage and Item
+func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_ttl")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.TTL(keys...)
if err != nil {
return err
@@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Delete accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Delete(in []byte, ok *bool) error {
+// Delete accept proto payload with Storage and Item
+func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rcp_delete")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
err := st.Delete(keys...)
if err != nil {
return errors.E(op, err)
@@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 321bfaaa..ac9ebcc2 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,8 +4,9 @@ import (
"context"
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
@@ -22,13 +23,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *message.Message
+ out chan *websocketsv1.Message
exit chan struct{}
}
func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *message.Message, 100)
+ out := make(chan *websocketsv1.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -65,7 +66,15 @@ func (fi *FanIn) read() {
if !ok {
return
}
- fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
+
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("message unmarshal")
+ continue
+ }
+
+ fi.out <- m
case <-fi.exit:
return
}
@@ -88,6 +97,6 @@ func (fi *FanIn) stop() error {
return nil
}
-func (fi *FanIn) consume() <-chan *message.Message {
+func (fi *FanIn) consume() <-chan *websocketsv1.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index b2603a40..47ffeb39 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,10 +6,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const PluginName = "redis"
@@ -107,10 +107,14 @@ func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return errors.E(err)
+ }
- for j := 0; j < fbsMsg.TopicsLength(); j++ {
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
if f.Err() != nil {
return f.Err()
}
@@ -122,12 +126,17 @@ func (p *Plugin) PublishAsync(msg []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
- for j := 0; j < fbsMsg.TopicsLength(); j++ {
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ p.log.Error("message unmarshal error")
+ return
+ }
+
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
- return
+ p.log.Error("redis publish", "error", f.Err())
}
}
}()
@@ -200,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
return <-p.fanin.consume(), nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 69aad7d4..e3d47166 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -8,6 +8,7 @@ import (
"github.com/fasthttp/websocket"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
@@ -63,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- msg := &pubsub.Message{}
+ msg := &websocketsv1.Message{}
err = json.Unmarshal(data, msg)
if err != nil {
diff --git a/plugins/memory/plugin.go b/plugins/websockets/memory/inMemory.go
index 6732ff5d..cef28182 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/websockets/memory/inMemory.go
@@ -4,13 +4,10 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- PluginName string = "memory"
+ "google.golang.org/protobuf/proto"
)
type Plugin struct {
@@ -23,19 +20,12 @@ type Plugin struct {
storage bst.Storage
}
-func (p *Plugin) Init(log logger.Logger) error {
- p.log = log
- p.pushCh = make(chan []byte, 10)
- p.storage = bst.NewBST()
- return nil
-}
-
-// Available interface implementation for the plugin
-func (p *Plugin) Available() {}
-
-// Name is endure.Named interface implementation
-func (p *Plugin) Name() string {
- return PluginName
+func NewInMemory(log logger.Logger) pubsub.PubSub {
+ return &Plugin{
+ log: log,
+ pushCh: make(chan []byte, 10),
+ storage: bst.NewBST(),
+ }
}
func (p *Plugin) Publish(message []byte) error {
@@ -77,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
@@ -86,15 +76,19 @@ func (p *Plugin) Next() (*message.Message, error) {
p.RLock()
defer p.RUnlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return nil, err
+ }
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ for i := 0; i < len(m.GetTopics()); i++ {
// if we have active subscribers - send a message to a topic
// or send nil instead
- if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok {
- return fbsMsg, nil
+ if ok := p.storage.Contains(m.GetTopics()[i]); ok {
+ return m, nil
}
}
return nil, nil
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 4c0edcad..6ddd609c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -23,9 +23,10 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/memory"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -79,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
+ // attach default driver
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+
return nil
}
@@ -301,16 +305,21 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return err
+ }
+
// Get payload
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
return nil
@@ -320,16 +329,21 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
p.log.Error("publish async error", "error", err)
- return
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
}()
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 544f3ede..1a7c6f8a 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -1,25 +1,22 @@
package pool
import (
- "bytes"
"sync"
"github.com/fasthttp/websocket"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
storage map[string]pubsub.PubSub
connections *sync.Map
resPool sync.Pool
- bPool sync.Pool
log logger.Logger
- queue chan *message.Message
+ queue chan *websocketsv1.Message
exit chan struct{}
}
@@ -27,7 +24,7 @@ type WorkersPool struct {
func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *message.Message, 100),
+ queue: make(chan *websocketsv1.Message, 100),
storage: pubsubs,
log: log,
exit: make(chan struct{}),
@@ -36,9 +33,6 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
- wp.bPool.New = func() interface{} {
- return new(bytes.Buffer)
- }
// start 10 workers
for i := 0; i < 50; i++ {
@@ -48,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
return wp
}
-func (wp *WorkersPool) Queue(msg *message.Message) {
+func (wp *WorkersPool) Queue(msg *websocketsv1.Message) {
wp.queue <- msg
}
@@ -73,15 +67,6 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
-func (wp *WorkersPool) putBytes(b *bytes.Buffer) {
- b.Reset()
- wp.bPool.Put(b)
-}
-
-func (wp *WorkersPool) getBytes() *bytes.Buffer {
- return wp.bPool.Get().(*bytes.Buffer)
-}
-
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
@@ -94,29 +79,27 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if msg == nil {
continue
}
- if msg.TopicsLength() == 0 {
+ if len(msg.GetTopics()) == 0 {
continue
}
- br, ok := wp.storage[utils.AsString(msg.Broker())]
+ br, ok := wp.storage[msg.Broker]
if !ok {
- wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage)
+ wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
continue
}
res := wp.get()
- bb := wp.getBytes()
- for i := 0; i < msg.TopicsLength(); i++ {
+ for i := 0; i < len(msg.GetTopics()); i++ {
// get connections for the particular topic
- br.Connections(utils.AsString(msg.Topics(i)), res)
+ br.Connections(msg.GetTopics()[i], res)
}
if len(res) == 0 {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
}
- wp.putBytes(bb)
wp.put(res)
continue
}
@@ -124,8 +107,8 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
@@ -133,20 +116,15 @@ func (wp *WorkersPool) do() { //nolint:gocognit
conn := c.(*connection.Connection)
// put data into the bytes buffer
- for i := 0; i < msg.PayloadLength(); i++ {
- bb.WriteByte(byte(msg.Payload(i)))
- }
- err := conn.Write(websocket.BinaryMessage, bb.Bytes())
+ err := conn.Write(websocket.BinaryMessage, msg.GetPayload())
if err != nil {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
}
- // put bytes buffer back
- wp.putBytes(bb)
// put map with results back
wp.put(res)
case <-wp.exit:
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 6c2cacb4..00c1dd91 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,10 +1,10 @@
package websockets
import (
- flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
)
// rpc collectors struct
@@ -13,39 +13,32 @@ type rpc struct {
log logger.Logger
}
-// Publish ... msg is a flatbuffers decoded payload
+// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(msg []byte, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
+ r.log.Debug("message published", "msg", in.Messages)
- b := flatbuffers.NewBuilder(100)
+ msgLen := len(in.GetMessages())
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
-
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
- err := r.plugin.Publish(bb)
+ err = r.plugin.Publish(bb)
if err != nil {
*ok = false
- b.Reset()
return errors.E(op, err)
}
- b.Reset()
}
*ok = true
@@ -54,68 +47,28 @@ func (r *rpc) Publish(msg []byte, ok *bool) error {
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
- r.log.Debug("message published", "msg", msg)
+func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
+ const op = errors.Op("publish_async")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
-
- b := flatbuffers.NewBuilder(100)
+ r.log.Debug("message published", "msg", in.Messages)
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
+ msgLen := len(in.GetMessages())
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
r.plugin.PublishAsync(bb)
- b.Reset()
}
*ok = true
return nil
}
-
-func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
- cmdOff := b.CreateByteString(msg.Command())
- brokerOff := b.CreateByteString(msg.Broker())
-
- offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
- for j := msg.TopicsLength() - 1; j >= 0; j-- {
- offsets[j] = b.CreateByteString(msg.Topics(j))
- }
-
- message.MessageStartTopicsVector(b, len(offsets))
-
- for j := len(offsets) - 1; j >= 0; j-- {
- b.PrependUOffsetT(offsets[j])
- }
-
- tOff := b.EndVector(len(offsets))
- bb := make([]byte, msg.PayloadLength())
- for i := 0; i < msg.PayloadLength(); i++ {
- bb[i] = byte(msg.Payload(i))
- }
- pOff := b.CreateByteVector(bb)
-
- message.MessageStart(b)
-
- message.MessageAddCommand(b, cmdOff)
- message.MessageAddBroker(b, brokerOff)
- message.MessageAddTopics(b, tOff)
- message.MessageAddPayload(b, pOff)
-
- fOff := message.MessageEnd(b)
- b.Finish(fOff)
-
- return b.FinishedBytes()
-}