diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 17 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/driver.go | 18 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/driver.go | 43 | ||||
-rw-r--r-- | plugins/kv/drivers/redis/driver.go | 20 | ||||
-rw-r--r-- | plugins/kv/interface.go | 14 | ||||
-rw-r--r-- | plugins/kv/payload/generated/Item.go | 67 | ||||
-rw-r--r-- | plugins/kv/payload/generated/Payload.go | 71 | ||||
-rw-r--r-- | plugins/kv/payload/payload.fbs | 14 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 153 | ||||
-rw-r--r-- | plugins/redis/fanin.go | 19 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 31 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 3 | ||||
-rw-r--r-- | plugins/websockets/memory/inMemory.go (renamed from plugins/memory/plugin.go) | 42 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 40 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 54 | ||||
-rw-r--r-- | plugins/websockets/rpc.go | 93 |
17 files changed, 225 insertions, 476 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 0f647cb1..ba873513 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -10,6 +10,7 @@ import ( "time" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -213,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } // Set puts the K/V to the bolt -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("boltdb_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -259,14 +260,14 @@ func (d *Driver) Set(items ...kv.Item) error { // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check // we do not need mutex here, since we use sync.Map - if items[i].TTL != "" { + if items[i].Timeout != "" { // check correctness of provided TTL - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } // Store key TTL in the separate map - d.gc.Store(items[i].Key, items[i].TTL) + d.gc.Store(items[i].Key, items[i].Timeout) } buf.Reset() @@ -323,20 +324,20 @@ func (d *Driver) Delete(keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("boltdb_driver_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // verify provided TTL - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } - d.gc.Store(items[i].Key, items[i].TTL) + d.gc.Store(items[i].Key, items[i].Timeout) } return nil } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 02281ed5..8ea515d0 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,6 +6,7 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -134,14 +135,14 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("memcached_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) } for i := range items { - if items[i] == EmptyItem { + if items[i] == nil { return errors.E(op, errors.EmptyItem) } @@ -154,9 +155,9 @@ func (d *Driver) Set(items ...kv.Item) error { } // add additional TTL in case of TTL isn't empty - if items[i].TTL != "" { + if items[i].Timeout != "" { // verify the TTL - t, err := time.Parse(time.RFC3339, items[i].TTL) + t, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return err } @@ -175,15 +176,18 @@ func (d *Driver) Set(items ...kv.Item) error { // MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("memcached_plugin_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // verify provided TTL - t, err := time.Parse(time.RFC3339, items[i].TTL) + t, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index cde84f42..3997e0d4 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -9,8 +9,6 @@ import ( const PluginName = "memcached" -var EmptyItem = kv.Item{} - type Plugin struct { // config plugin cfgPlugin config.Configurer diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index c2494ee7..3158adee 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -6,6 +6,7 @@ import ( "time" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -71,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return utils.AsBytes(data.(kv.Item).Value), nil + return utils.AsBytes(data.(*kvv1.Item).Value), nil } return nil, nil } @@ -94,24 +95,27 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { for i := range keys { if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(kv.Item).Value + m[keys[i]] = value.(*kvv1.Item).Value } } return m, nil } -func (s *Driver) Set(items ...kv.Item) error { +func (s *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) } for i := range items { + if items[i] == nil { + continue + } // TTL is set - if items[i].TTL != "" { + if items[i].Timeout != "" { // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return err } @@ -124,28 +128,31 @@ func (s *Driver) Set(items ...kv.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...kv.Item) error { +func (s *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // if key exist, overwrite it value - if pItem, ok := s.heap.Load(items[i].Key); ok { + if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } - tmp := pItem.(kv.Item) + tmp := pItem.(*kvv1.Item) // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, kv.Item{ - Key: items[i].Key, - Value: tmp.Value, - TTL: items[i].TTL, + s.heap.Store(items[i].Key, &kvv1.Item{ + Key: items[i].Key, + Value: tmp.Value, + Timeout: items[i].Timeout, }) } } @@ -171,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { for i := range keys { if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(kv.Item).TTL + m[keys[i]] = item.(*kvv1.Item).Timeout } } return m, nil @@ -209,12 +216,12 @@ func (s *Driver) gc() { case now := <-ticker.C: // check every second s.heap.Range(func(key, value interface{}) bool { - v := value.(kv.Item) - if v.TTL == "" { + v := value.(*kvv1.Item) + if v.Timeout == "" { return true } - t, err := time.Parse(time.RFC3339, v.TTL) + t, err := time.Parse(time.RFC3339, v.Timeout) if err != nil { return false } diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go index d0b541b2..0aaa6352 100644 --- a/plugins/kv/drivers/redis/driver.go +++ b/plugins/kv/drivers/redis/driver.go @@ -7,13 +7,12 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" ) -var EmptyItem = kv.Item{} - type Driver struct { universalClient redis.UniversalClient log logger.Logger @@ -139,24 +138,24 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // // Use expiration for `SETEX`-like behavior. // Zero expiration means the key has no expiration time. -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("redis_driver_set") if items == nil { return errors.E(op, errors.NoKeys) } now := time.Now() for _, item := range items { - if item == EmptyItem { + if item == nil { return errors.E(op, errors.EmptyKey) } - if item.TTL == "" { + if item.Timeout == "" { err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err() if err != nil { return err } } else { - t, err := time.Parse(time.RFC3339, item.TTL) + t, err := time.Parse(time.RFC3339, item.Timeout) if err != nil { return err } @@ -188,15 +187,18 @@ func (d *Driver) Delete(keys ...string) error { // MExpire https://redis.io/commands/expire // timeout in RFC3339 -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("redis_driver_mexpire") now := time.Now() for _, item := range items { - if item.TTL == "" || strings.TrimSpace(item.Key) == "" { + if item == nil { + continue + } + if item.Timeout == "" || strings.TrimSpace(item.Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } - t, err := time.Parse(time.RFC3339, item.TTL) + t, err := time.Parse(time.RFC3339, item.Timeout) if err != nil { return err } diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 20dbb8b3..7841f9a2 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,14 +1,6 @@ package kv -// Item represents general storage item -type Item struct { - // Key of item - Key string - // Value of item - Value string - // live until time provided by TTL in RFC 3339 format - TTL string -} +import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" // Storage represents single abstract storage. type Storage interface { @@ -24,10 +16,10 @@ type Storage interface { // Set used to upload item to KV with TTL // 0 value in TTL means no TTL - Set(items ...Item) error + Set(items ...*kvv1.Item) error // MExpire sets the TTL for multiply keys - MExpire(items ...Item) error + MExpire(items ...*kvv1.Item) error // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go deleted file mode 100644 index 61bd6024..00000000 --- a/plugins/kv/payload/generated/Item.go +++ /dev/null @@ -1,67 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package generated - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Item struct { - _tab flatbuffers.Table -} - -func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Item{} - x.Init(buf, n+offset) - return x -} - -func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Item) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Item) Key() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Item) Value() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Item) Timeout() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func ItemStart(builder *flatbuffers.Builder) { - builder.StartObject(3) -} -func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0) -} -func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0) -} -func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0) -} -func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go deleted file mode 100644 index a2c6cfdb..00000000 --- a/plugins/kv/payload/generated/Payload.go +++ /dev/null @@ -1,71 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package generated - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Payload struct { - _tab flatbuffers.Table -} - -func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Payload{} - x.Init(buf, n+offset) - return x -} - -func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Payload) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Payload) Storage() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Payload) Items(obj *Item, j int) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - x := rcv._tab.Vector(o) - x += flatbuffers.UOffsetT(j) * 4 - x = rcv._tab.Indirect(x) - obj.Init(rcv._tab.Bytes, x) - return true - } - return false -} - -func (rcv *Payload) ItemsLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func PayloadStart(builder *flatbuffers.Builder) { - builder.StartObject(2) -} -func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0) -} -func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0) -} -func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(4, numElems, 4) -} -func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs deleted file mode 100644 index 7e02c1a0..00000000 --- a/plugins/kv/payload/payload.fbs +++ /dev/null @@ -1,14 +0,0 @@ -namespace generated; - -table Payload { - Storage:string; - Items:[Item]; -} - -table Item { - Key:string; - Value:string; - Timeout:string; -} - -root_type Payload; diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 2d4babbe..a9efe0c4 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,9 +2,8 @@ package kv import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" ) // Wrapper for the plugin @@ -17,23 +16,21 @@ type rpc struct { log logger.Logger } -// Has accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Has(in []byte, res *map[string]bool) error { +// Has accept []*payload.Payload proto payload with Storage and Item +func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { const op = errors.Op("rpc_has") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) + if in.Storage == "" { + return errors.E(op, errors.Str("no storage provided")) + } - tmpItem := &generated.Item{} - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, utils.AsString(tmpItem.Key())) + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok { + if st, ok := r.storages[in.Storage]; ok { ret, err := st.Has(keys...) if err != nil { return err @@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// Set accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Set(in []byte, ok *bool) error { +// Set accept proto payload with Storage and Item +func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_set") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - - items := make([]Item, 0, dataRoot.ItemsLength()) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - - itc := Item{ - Key: string(tmpItem.Key()), - Value: string(tmpItem.Value()), - TTL: string(tmpItem.Timeout()), - } - - items = append(items, itc) - } - - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { - err := st.Set(items...) + if st, exists := r.storages[in.GetStorage()]; exists { + err := st.Set(in.GetItems()...) if err != nil { return err } @@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// MGet accept []byte flatbuffers payload with Storage and Item -func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { +// MGet accept proto payload with Storage and Item +func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_mget") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} + keys := make([]string, 0, len(in.GetItems())) - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { ret, err := st.MGet(keys...) if err != nil { return err @@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// MExpire accept []byte flatbuffers payload with Storage and Item -func (r *rpc) MExpire(in []byte, ok *bool) error { +// MExpire accept proto payload with Storage and Item +func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_mexpire") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - - // when unmarshalling the keys, simultaneously, fill up the slice with items - items := make([]Item, 0, l) - tmpItem := &generated.Item{} - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - - itc := Item{ - Key: string(tmpItem.Key()), - // we set up timeout on the keys, so, value here is redundant - Value: "", - TTL: string(tmpItem.Timeout()), - } - - items = append(items, itc) - } - - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { - err := st.MExpire(items...) + if st, exists := r.storages[in.GetStorage()]; exists { + err := st.MExpire(in.GetItems()...) if err != nil { return errors.E(op, err) } @@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// TTL accept []byte flatbuffers payload with Storage and Item -func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { +// TTL accept proto payload with Storage and Item +func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_ttl") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { ret, err := st.TTL(keys...) if err != nil { return err @@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// Delete accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Delete(in []byte, ok *bool) error { +// Delete accept proto payload with Storage and Item +func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rcp_delete") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { err := st.Delete(keys...) if err != nil { return errors.E(op, err) @@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 321bfaaa..ac9ebcc2 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,8 +4,9 @@ import ( "context" "sync" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" "github.com/go-redis/redis/v8" "github.com/spiral/errors" @@ -22,13 +23,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *message.Message + out chan *websocketsv1.Message exit chan struct{} } func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *message.Message, 100) + out := make(chan *websocketsv1.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -65,7 +66,15 @@ func (fi *FanIn) read() { if !ok { return } - fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) + + m := &websocketsv1.Message{} + err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) + if err != nil { + fi.log.Error("message unmarshal") + continue + } + + fi.out <- m case <-fi.exit: return } @@ -88,6 +97,6 @@ func (fi *FanIn) stop() error { return nil } -func (fi *FanIn) consume() <-chan *message.Message { +func (fi *FanIn) consume() <-chan *websocketsv1.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index b2603a40..47ffeb39 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,10 +6,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const PluginName = "redis" @@ -107,10 +107,14 @@ func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(msg, 0) + m := &websocketsv1.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return errors.E(err) + } - for j := 0; j < fbsMsg.TopicsLength(); j++ { - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) if f.Err() != nil { return f.Err() } @@ -122,12 +126,17 @@ func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(msg, 0) - for j := 0; j < fbsMsg.TopicsLength(); j++ { - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + m := &websocketsv1.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + p.log.Error("message unmarshal error") + return + } + + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) if f.Err() != nil { - p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) - return + p.log.Error("redis publish", "error", f.Err()) } } }() @@ -200,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { return <-p.fanin.consume(), nil } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 69aad7d4..e3d47166 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -8,6 +8,7 @@ import ( "github.com/fasthttp/websocket" json "github.com/json-iterator/go" "github.com/spiral/errors" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" @@ -63,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit return errors.E(op, err) } - msg := &pubsub.Message{} + msg := &websocketsv1.Message{} err = json.Unmarshal(data, msg) if err != nil { diff --git a/plugins/memory/plugin.go b/plugins/websockets/memory/inMemory.go index 6732ff5d..cef28182 100644 --- a/plugins/memory/plugin.go +++ b/plugins/websockets/memory/inMemory.go @@ -4,13 +4,10 @@ import ( "sync" "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - PluginName string = "memory" + "google.golang.org/protobuf/proto" ) type Plugin struct { @@ -23,19 +20,12 @@ type Plugin struct { storage bst.Storage } -func (p *Plugin) Init(log logger.Logger) error { - p.log = log - p.pushCh = make(chan []byte, 10) - p.storage = bst.NewBST() - return nil -} - -// Available interface implementation for the plugin -func (p *Plugin) Available() {} - -// Name is endure.Named interface implementation -func (p *Plugin) Name() string { - return PluginName +func NewInMemory(log logger.Logger) pubsub.PubSub { + return &Plugin{ + log: log, + pushCh: make(chan []byte, 10), + storage: bst.NewBST(), + } } func (p *Plugin) Publish(message []byte) error { @@ -77,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } } -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil @@ -86,15 +76,19 @@ func (p *Plugin) Next() (*message.Message, error) { p.RLock() defer p.RUnlock() - fbsMsg := message.GetRootAsMessage(msg, 0) + m := &websocketsv1.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return nil, err + } // push only messages, which are subscribed // TODO better??? - for i := 0; i < fbsMsg.TopicsLength(); i++ { + for i := 0; i < len(m.GetTopics()); i++ { // if we have active subscribers - send a message to a topic // or send nil instead - if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok { - return fbsMsg, nil + if ok := p.storage.Contains(m.GetTopics()[i]); ok { + return m, nil } } return nil, nil diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 4c0edcad..6ddd609c 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -14,8 +14,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -23,9 +23,10 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" + "github.com/spiral/roadrunner/v2/plugins/websockets/memory" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const ( @@ -79,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server + // attach default driver + p.pubsubs["memory"] = memory.NewInMemory(p.log) + return nil } @@ -301,16 +305,21 @@ func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() + msg := &websocketsv1.Message{} + err := proto.Unmarshal(m, msg) + if err != nil { + return err + } + // Get payload - fbsMsg := message.GetRootAsMessage(m, 0) - for i := 0; i < fbsMsg.TopicsLength(); i++ { - if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { - err := br.Publish(fbsMsg.Table().Bytes) + for i := 0; i < len(msg.GetTopics()); i++ { + if br, ok := p.pubsubs[msg.GetBroker()]; ok { + err := br.Publish(m) if err != nil { return errors.E(err) } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) } } return nil @@ -320,16 +329,21 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(m, 0) - for i := 0; i < fbsMsg.TopicsLength(); i++ { - if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { - err := br.Publish(fbsMsg.Table().Bytes) + msg := &websocketsv1.Message{} + err := proto.Unmarshal(m, msg) + if err != nil { + p.log.Error("message unmarshal") + } + + // Get payload + for i := 0; i < len(msg.GetTopics()); i++ { + if br, ok := p.pubsubs[msg.GetBroker()]; ok { + err := br.Publish(m) if err != nil { p.log.Error("publish async error", "error", err) - return } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) } } }() diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 544f3ede..1a7c6f8a 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -1,25 +1,22 @@ package pool import ( - "bytes" "sync" "github.com/fasthttp/websocket" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { storage map[string]pubsub.PubSub connections *sync.Map resPool sync.Pool - bPool sync.Pool log logger.Logger - queue chan *message.Message + queue chan *websocketsv1.Message exit chan struct{} } @@ -27,7 +24,7 @@ type WorkersPool struct { func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *message.Message, 100), + queue: make(chan *websocketsv1.Message, 100), storage: pubsubs, log: log, exit: make(chan struct{}), @@ -36,9 +33,6 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log wp.resPool.New = func() interface{} { return make(map[string]struct{}, 10) } - wp.bPool.New = func() interface{} { - return new(bytes.Buffer) - } // start 10 workers for i := 0; i < 50; i++ { @@ -48,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log return wp } -func (wp *WorkersPool) Queue(msg *message.Message) { +func (wp *WorkersPool) Queue(msg *websocketsv1.Message) { wp.queue <- msg } @@ -73,15 +67,6 @@ func (wp *WorkersPool) get() map[string]struct{} { return wp.resPool.Get().(map[string]struct{}) } -func (wp *WorkersPool) putBytes(b *bytes.Buffer) { - b.Reset() - wp.bPool.Put(b) -} - -func (wp *WorkersPool) getBytes() *bytes.Buffer { - return wp.bPool.Get().(*bytes.Buffer) -} - func (wp *WorkersPool) do() { //nolint:gocognit go func() { for { @@ -94,29 +79,27 @@ func (wp *WorkersPool) do() { //nolint:gocognit if msg == nil { continue } - if msg.TopicsLength() == 0 { + if len(msg.GetTopics()) == 0 { continue } - br, ok := wp.storage[utils.AsString(msg.Broker())] + br, ok := wp.storage[msg.Broker] if !ok { - wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage) + wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) continue } res := wp.get() - bb := wp.getBytes() - for i := 0; i < msg.TopicsLength(); i++ { + for i := 0; i < len(msg.GetTopics()); i++ { // get connections for the particular topic - br.Connections(utils.AsString(msg.Topics(i)), res) + br.Connections(msg.GetTopics()[i], res) } if len(res) == 0 { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) } - wp.putBytes(bb) wp.put(res) continue } @@ -124,8 +107,8 @@ func (wp *WorkersPool) do() { //nolint:gocognit for i := range res { c, ok := wp.connections.Load(i) if !ok { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) } continue } @@ -133,20 +116,15 @@ func (wp *WorkersPool) do() { //nolint:gocognit conn := c.(*connection.Connection) // put data into the bytes buffer - for i := 0; i < msg.PayloadLength(); i++ { - bb.WriteByte(byte(msg.Payload(i))) - } - err := conn.Write(websocket.BinaryMessage, bb.Bytes()) + err := conn.Write(websocket.BinaryMessage, msg.GetPayload()) if err != nil { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) } continue } } - // put bytes buffer back - wp.putBytes(bb) // put map with results back wp.put(res) case <-wp.exit: diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 6c2cacb4..00c1dd91 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -1,10 +1,10 @@ package websockets import ( - flatbuffers "github.com/google/flatbuffers/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" ) // rpc collectors struct @@ -13,39 +13,32 @@ type rpc struct { log logger.Logger } -// Publish ... msg is a flatbuffers decoded payload +// Publish ... msg is a proto decoded payload // see: pkg/pubsub/message.fbs -func (r *rpc) Publish(msg []byte, ok *bool) error { +func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error { const op = errors.Op("broadcast_publish") - r.log.Debug("message published") // just return in case of nil message - if msg == nil { + if in == nil { *ok = true return nil } - fbsMsg := message.GetRootAsMessages(msg, 0) - tmpMsg := &message.Message{} + r.log.Debug("message published", "msg", in.Messages) - b := flatbuffers.NewBuilder(100) + msgLen := len(in.GetMessages()) - for i := 0; i < fbsMsg.MessagesLength(); i++ { - // init a message - fbsMsg.Messages(tmpMsg, i) - - // overhead HERE - orig := serializeMsg(b, tmpMsg) - bb := make([]byte, len(orig)) - copy(bb, orig) + for i := 0; i < msgLen; i++ { + bb, err := proto.Marshal(in.GetMessages()[i]) + if err != nil { + return errors.E(op, err) + } - err := r.plugin.Publish(bb) + err = r.plugin.Publish(bb) if err != nil { *ok = false - b.Reset() return errors.E(op, err) } - b.Reset() } *ok = true @@ -54,68 +47,28 @@ func (r *rpc) Publish(msg []byte, ok *bool) error { // PublishAsync ... // see: pkg/pubsub/message.fbs -func (r *rpc) PublishAsync(msg []byte, ok *bool) error { - r.log.Debug("message published", "msg", msg) +func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error { + const op = errors.Op("publish_async") // just return in case of nil message - if msg == nil { + if in == nil { *ok = true return nil } - fbsMsg := message.GetRootAsMessages(msg, 0) - tmpMsg := &message.Message{} - - b := flatbuffers.NewBuilder(100) + r.log.Debug("message published", "msg", in.Messages) - for i := 0; i < fbsMsg.MessagesLength(); i++ { - // init a message - fbsMsg.Messages(tmpMsg, i) + msgLen := len(in.GetMessages()) - // overhead HERE - orig := serializeMsg(b, tmpMsg) - bb := make([]byte, len(orig)) - copy(bb, orig) + for i := 0; i < msgLen; i++ { + bb, err := proto.Marshal(in.GetMessages()[i]) + if err != nil { + return errors.E(op, err) + } r.plugin.PublishAsync(bb) - b.Reset() } *ok = true return nil } - -func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte { - cmdOff := b.CreateByteString(msg.Command()) - brokerOff := b.CreateByteString(msg.Broker()) - - offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength()) - for j := msg.TopicsLength() - 1; j >= 0; j-- { - offsets[j] = b.CreateByteString(msg.Topics(j)) - } - - message.MessageStartTopicsVector(b, len(offsets)) - - for j := len(offsets) - 1; j >= 0; j-- { - b.PrependUOffsetT(offsets[j]) - } - - tOff := b.EndVector(len(offsets)) - bb := make([]byte, msg.PayloadLength()) - for i := 0; i < msg.PayloadLength(); i++ { - bb[i] = byte(msg.Payload(i)) - } - pOff := b.CreateByteVector(bb) - - message.MessageStart(b) - - message.MessageAddCommand(b, cmdOff) - message.MessageAddBroker(b, brokerOff) - message.MessageAddTopics(b, tOff) - message.MessageAddPayload(b, pOff) - - fOff := message.MessageEnd(b) - b.Finish(fOff) - - return b.FinishedBytes() -} |