summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/broadcast/config.go25
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio1
-rw-r--r--plugins/broadcast/interface.go7
-rw-r--r--plugins/broadcast/plugin.go208
-rw-r--r--plugins/broadcast/rpc.go87
-rw-r--r--plugins/kv/config.go2
-rw-r--r--plugins/kv/drivers/boltdb/driver.go2
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go2
-rw-r--r--plugins/kv/drivers/memcached/driver.go2
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/interface.go15
-rw-r--r--plugins/kv/plugin.go60
-rw-r--r--plugins/kv/rpc.go2
-rw-r--r--plugins/memory/kv.go2
-rw-r--r--plugins/memory/plugin.go4
-rw-r--r--plugins/memory/pubsub.go35
-rw-r--r--plugins/redis/channel.go97
-rw-r--r--plugins/redis/fanin.go102
-rw-r--r--plugins/redis/kv.go2
-rw-r--r--plugins/redis/plugin.go6
-rw-r--r--plugins/redis/pubsub.go56
-rw-r--r--plugins/websockets/config.go71
-rw-r--r--plugins/websockets/executor/executor.go66
-rw-r--r--plugins/websockets/origin_test.go12
-rw-r--r--plugins/websockets/plugin.go190
-rw-r--r--plugins/websockets/pool/workers_pool.go90
-rw-r--r--plugins/websockets/rpc.go75
27 files changed, 641 insertions, 582 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
new file mode 100644
index 00000000..4f1e5213
--- /dev/null
+++ b/plugins/broadcast/config.go
@@ -0,0 +1,25 @@
+package broadcast
+
+/*
+
+# Global redis config (priority - 2)
+
+websockets: # <----- one of possible subscribers
+ path: /ws
+ broker: default # <------ broadcast broker to use --------------- |
+ | match
+broadcast: # <-------- broadcast entry point plugin |
+ default: # <----------------------------------------------------- |
+ driver: redis
+ # local redis config (priority - 1)
+ test:
+ driver: memory
+
+
+priority local -> global
+*/
+
+// Config ...
+type Config struct {
+ Data map[string]interface{} `mapstructure:"broadcast"`
+}
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..fd5ff1f9
--- /dev/null
+++ b/plugins/broadcast/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-18T09:34:25.915Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="THNfOcV33EQGG0gzo1UK" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1bc6M4Fv41rk1vVVIg7o+Jk8l01fR2Np7e7n7a4iLbbDB4AMdJ//qVQGCQZBsHEMSTviRGIAznfj4dSRNlunq5j+318kvkwWACJO9lotxOALBkE/3EDa95AzAVJW9ZxL6Xt8m7hpn/C5JGibRufA8mtQvTKApSf11vdKMwhG5aa7PjONrWL5tHQf1b1/YCMg0z1w7Y1u++ly7Ja0iStDvxO/QXy5Q+s7KLq0lDsrS9aFtpUu4myjSOojT/tHqZwgBTryBM3u+3PWfLJ4thmDbpMFV1/fPNfXhp/FAuZ+Hz8kd0f6mr+W2e7WBDXpk8bfpa0CCONqEH8V2kiXKzXfopnK1tF5/dIq6jtmW6CtCRjD7O/SCYRkEUZ30Vz4bm3EXtSRpHT7ByRndN6MzRGfY9yKs9wziFL5Um8l73MFrBNH5Fl5CzikloTMRM1cjxtsazvG1ZZZdFGm0iJ4vy3jtKog+EmCcQVtP7Jexcw3+5hM3+4B5RmFba8z/dEFyW5DrFDYOluGpyKK7oWl8U186b4qBO8ctSoAckec/WY2iSU0KuSNrQFLcY+kIPOSxyGMXpMlpEoR3c7Vpv6hzYXfNHFK0J3f8H0/SVeF97k0Z1rsAXP/2Bu19p5OgnuRn+fPtSPXgtDkL0upVO+PBncT98sOuWHRX99vItiTaxCw+QpggP7HgB00PXEYZhwh0UgxgGduo/1yMBHkdJ14fIR89cio8BpCtZBaZmkJ81WdIkyvPkz03uQUlJ+VBvF5xCIoeSHFAVHbmh6FQFpyJHe0SHNh6mC11u6OGYGo4QRAhbEV91LGzXcWy/Vi5YY7FJ9suibmo16dNVKlakr5fbXW/Kh69XJKnV9ZqlUUqSU6RTlSkcxs653Tx+vb6dXs/+7NbLtRDU5t5M1ygSciI2WeU4M62vEFkd0iLJFXO082unGSRwqkWazwGf0Z7u6JreyiKBpu6va4PUSgYAo2OPd7efZ6jp4dvN7NtNt4o2h/oe+huWI3WkaCoVqJcRYlXRgEhFG9T1d6FoY9Iz5V3qmcLo2Ze7L18ff56TonE9mlBFM967ohlj0jT9XWoai/tdf/n3w0T5Df87I23T9aG1zWQojV49wKMA6Muy7vaz7Qe2g4iMXn7jJBsHffBiJANxwrAA0SGt09kO/EWIPruIahCR9AZTy3ft4JqcWPmelysxTPxf2RflDCT5GLqvdjPRbvG9kN4muQrLDKvCKIT9IOGaynKJj4T3xSVZGdIojhyykpqauHHZuOK5q6lxHNmeaydY+XysLnNsvbo0c54GTU/lmTkTOIp+0NeckCbrb4ze1d4USP9QoGN6cVyBtK4V6E04nEE50WL8cR/uZRlmq+v1I9fLsiUd6tAPsKaBYSXaaC7SAkRT7dy2v000DaUmCbJ5ouhQHXoSnUGjiXqKZTTNsUZnDYE1CpEzKQlSSay4T+To62VZO1VGqR49DRywIwcToAc4u1jXZFf/a4Orf25WiGE+Siiu0Vlp/YJ+ZnIg5e2XKRZTfE6tnMPpyiXJUPA5kqSU90SfFuS3vcKRVcAeVQI10lzvUhxlFztx5ZZ5S9FQBn/ZA5Av+s2hr0dta7ptydy1OxKRcwGcp8fpl13FUO8eprdZvnjxBF+zB4v9cPEJfZptnEdoe7U3rrwdZaSSpb3GH+OsSK1qWyJ083mQFYbhgHdPWNwh7nJCLEyN6imclL+s1KvGwkZvsfBH/cPREPe49RdUAMFYX42GgnqueZDZirDv0Eki9wmmLP7TJjd1ZM+bSzx9lCVDsToCd2RZoRFv44otSeKmp0pfFUmywRD5c+inFxhqK/3CJ4baZ4G2MUV5OuCgBWLhNhYVfYQuxFoMJKcRSHOmrOEgobIukjWlJX9PuUvVdzUYH3q7+yrIftx9AUHuC9BYuiJdYYDd0sjPRs7s1KRILqMnCsLfm+SwHUxKVnvIccCg1TvjjsSai3I/qCQ3EruSJUVVTWBopg7qUI9Kz4LoOS4r6FPxUV+3Ye6fnnBCJSUoTfKjMHNTk2x2z9xfnEXEJnO8kNDhBDDseFzNC4GGXkiue6GjdXctVBc0Vd3Ok6h2XAWMShW4hOc/F7DEdpf9YMyjxC0ql3B6lXBQ6CTrHDwhipoBJx6c25sgbXM7OwiiLfT+G8V+CfLssJh/1mCZN3/JOpuQV781us82OX77NnaniJ1zFGogM6RQM1lUi5M28op/e0NywMeo5tGw4LgRMkTFD6Z1tZvIohbfW+CCouMHFtcpkYarHWJbGilioTBqe0lj21Uc9/2FFxJV1K8qbHgBuDPUetNrFg6aIXHAwgRfoIs0EhF6momk7aFfK5gk9gIiIywFWIvfIRPoGE/hIQ1iYzzzw7jupY3V0Lgq/UzdajBIWc47FWVPLUZnv6bLLBF7CDaLLAVjsavx13IxmolS4OEBc4XNfv++gLksWRxjKRQxV9jR+g/EPFcfHlghFDJXwJCObOyQeeGhjrsyXZAroyFzyxoEMpeLVXgaY+aohwDQXPmohe9AmE1xcdkB0NwSXMxQ0GdI0NyTXAgBL2zTdE02+kqorKYT+3pLqJRhV8+o+aHxgeZK05l63adU7bjKW6OLgZLzVKg7vDyFSQpa3GxfNaY40FqQGaBBa0sbGrRWBp3gO3L/bTY1Av0Un58KWpui/TdbmDU20FqQXtOgtcmBAMSC1goLgM1g/Nzx/MVhgieTs0yg2OCpuPGH1dw/+ev4LDFRKTyNRtNi0bOZVFk0ro5Fs4hQq7UTbM/yuCtM2lCVFdCTUurDw9AqYAj9N4Khafhn8LptlV2h5wOFrkX4g4HQ6ogWqhsfCK02Tf5VUbgdDUKDOgYtm2ojj9YahVZOxKCJ69t7PVPEVbu+H8RaHXjZ2FHHbk0lv8h3BkasL2kL2Xcox8JrwhFr0fHdcEnWsNDUyBFqtSk41X2a1Y6rLGi0D6HOs6NprkLdotXyKTcbHJ0WpPK0Ny5nGx9Fp3vL6D4mxx91wcd9dT+r8ZyKTl/Kgp11QaARw9OCFJuGpy85K8CJhac1FhPrHp4eCAm7lAcf3NfPvipH5M4YmtHQ0o5rSUuNnbfw+DCt4237YptkbYdFWwnVXT1snMBPlhdkhsOnSiRU7dCpEkPZ06DB46ylG4rd1bpASn1dIIM3xGQIVeFBl1Y4SYXRwQOMffTe2I8OFQ41zYe0zmeYtdNSNh/aJJwwo1hua7MKrl085WgHff9hOzB4iBI/Qx6UWydK02g1YbHxbEmx2kpdmzTwQ6RUxVaTB3nQXJ0Mido8h7N7ocLRJr03bWKBmxkMvUk5WwtLToS/Gf0nZo7DhLMYj1AokNbgrQfMs3W9DUforKd6iKNn38Nsqa3ElzMpqg2fnum65zq1QRRoulRdF2y6MR7+9f3PG/mv6eNqMdV/RL/Pvg27FwS131rjqBIICSspZ3LUhXHpyymc4F5ntvRX/PRdo5YJlzXaGneXr/Nff9ApnvVByaZ4b0v5aryXsFD5kkUFRNxvx/1HY2aabuvYKnkdqRSALszMqUPOjBUy9GNjzvLhHq1HkQ9Rkb9dRJcYVlMPxAQWHOnYG2to9KxijbeaSl8QFt8MMATO8dpMAhc+IodNBloTdwlX9miDwFaMAQxjOJlUb7VDXMaMqHRoXFvvdWKdOXvx7bc/Q7loTmEftcUljSt+Xq0DuIKZ6cf59Szbt2oC8NDKw2wahYi4mwzW6NR4Nt1GrGMd5S0A0Nc+tFwGsasJkX3EkvzbjhH9LGwnjXPw+SLUdrIoVIUP6ISLQXRpXWIfXuHyWI1y7ARi/CpzgekS7vrh1rzuaFP6yGyp/8ZMJlUI42cxoDASWee4Ry7S2MXIJpfFA1csGG/KXoxa+rJziSN0kByYf3wOskhJPoKkYWWgn0IXnV7dSpOkq7w8VdJNC7+uSNyMHUCa3T3+544ThIX+biPRfPNQHAqgmMpd4vYYcirnkjV0/bmfeRV/52v6KGwVEq0xqa7atFqji8Epfqo78GzCt/gMqtD11F2WBUNejTGvLuzFyXuGGSplTo7sAWape0CBxh10st5ovxAZZxUyKk2ULrBN8lHc+SszMHY2JJvg1TWh94m1YLOyuv4Cvtg4p/xErdjcxh4dWSKiqci2Mk/0yLms6iwSB4rlYrteIoLPx/e4rrHRBpBvXHPUjXUCY7ZOLL5ebJu+19rQ4dHJHQz5cIdyT6I9HXqyZywk3dPeW43lr5WpKXeO3PkRkXN++DRmK0Hu8wnDq9qO93MMWpFQdIXfqZh3ddAbnAWspZu0M+dVc/Pgxt5grWIPwyreuITuU4Uv58kKpmTCZJ21JZITgI25fuKVyHHZ72f8EHamTOnST6rMmWbPh/M/KTvxinzWZAc/niXvaKejS5xAS6gaAd6CCekmxvEuqUvMx03KAm4cLa+L9awvYuhl3CNpes5iEiWfJQ8NqmpbNjgrKyhCa+SKTcRqQy+VgRYphDCH6HN9w5rnQNfeJFj5thnoH4X/wMHOUxhtJyyUb6el5yOADsqXFvCKzZZKedn62YM7uI/9bPtBxjxSTOnkcE42PJQ9mPNafsFd6G0yqxCFQRY0zvMqzOoXZxxLEuhdNVnY7xzFkAlfAWtJVN440xsGIdBhHOGdfXaRLYoml18iD+Ir/g8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
new file mode 100644
index 00000000..46709d71
--- /dev/null
+++ b/plugins/broadcast/interface.go
@@ -0,0 +1,7 @@
+package broadcast
+
+import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+
+type Broadcaster interface {
+ GetDriver(key string) (pubsub.SubReader, error)
+}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
new file mode 100644
index 00000000..6ddef806
--- /dev/null
+++ b/plugins/broadcast/plugin.go
@@ -0,0 +1,208 @@
+package broadcast
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/google/uuid"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "broadcast"
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ redis string = "redis"
+ memory string = "memory"
+)
+
+type Plugin struct {
+ sync.RWMutex
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
+ // publishers implement Publisher interface
+ // and able to receive a payload
+ publishers map[string]pubsub.PubSub
+ constructors map[string]pubsub.Constructor
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+ p.cfg = &Config{}
+ // unmarshal config section
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.publishers = make(map[string]pubsub.PubSub)
+ p.constructors = make(map[string]pubsub.Constructor)
+
+ p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectPublishers,
+ }
+}
+
+// CollectPublishers collect all plugins who implement pubsub.Publisher interface
+func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) {
+ // key redis, value - interface
+ p.constructors[name.Name()] = constructor
+}
+
+// Publish is an entry point to the websocket PUBSUB
+func (p *Plugin) Publish(m *pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("broadcast_plugin_publish")
+
+ // check if any publisher registered
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err := p.publishers[j].Publish(m)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ return nil
+ } else {
+ p.log.Warn("no publishers registered")
+ }
+
+ return nil
+}
+
+func (p *Plugin) PublishAsync(m *pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ // check if any publisher registered
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err := p.publishers[j].Publish(m)
+ if err != nil {
+ p.log.Error("publishAsync", "error", err)
+ // continue publish to other registered publishers
+ continue
+ }
+ }
+ } else {
+ p.log.Warn("no publishers registered")
+ }
+ }()
+}
+
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit
+ const op = errors.Op("broadcast_plugin_get_driver")
+
+ // choose a driver
+ if val, ok := p.cfg.Data[key]; ok {
+ // check type of the v
+ // should be a map[string]interface{}
+ switch t := val.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val)))
+ }
+ default:
+ return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation"))
+ }
+
+ // config key for the particular sub-driver kv.memcached
+ configKey := fmt.Sprintf("%s.%s", PluginName, key)
+
+ switch val.(map[string]interface{})[driver] {
+ case memory:
+ if _, ok := p.constructors[memory]; !ok {
+ return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
+ }
+ ps, err := p.constructors[memory].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
+ p.publishers[uuid.NewString()] = ps
+
+ return ps, nil
+ case redis:
+ if _, ok := p.constructors[redis]; !ok {
+ return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ }
+
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ ps, err := p.constructors[redis].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if section already exists, return new connection
+ if _, ok := p.publishers[configKey]; ok {
+ return ps, nil
+ }
+
+ // if not - initialize a connection
+ p.publishers[configKey] = ps
+ return ps, nil
+
+ // then try global if local does not exist
+ case p.cfgPlugin.Has(redis):
+ ps, err := p.constructors[redis].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if section already exists, return new connection
+ if _, ok := p.publishers[configKey]; ok {
+ return ps, nil
+ }
+
+ // if not - initialize a connection
+ p.publishers[configKey] = ps
+ return ps, nil
+ }
+ }
+ }
+ return nil, errors.E(op, errors.Str("could not find driver by provided key"))
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
new file mode 100644
index 00000000..2ee211f8
--- /dev/null
+++ b/plugins/broadcast/rpc.go
@@ -0,0 +1,87 @@
+package broadcast
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+// Publish ... msg is a proto decoded payload
+// see: root/proto
+func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("broadcast_publish")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.String())
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
+
+ err := r.plugin.Publish(tmp)
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+ }
+ }
+
+ out.Ok = true
+ return nil
+}
+
+// PublishAsync ...
+// see: root/proto
+func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.GetMessages())
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
+
+ r.plugin.PublishAsync(tmp)
+ }
+ }
+
+ out.Ok = true
+ return nil
+}
diff --git a/plugins/kv/config.go b/plugins/kv/config.go
index 66095817..09ba79cd 100644
--- a/plugins/kv/config.go
+++ b/plugins/kv/config.go
@@ -1,6 +1,6 @@
package kv
-// Config represents general storage configuration with keys as the user defined kv-names and values as the drivers
+// Config represents general storage configuration with keys as the user defined kv-names and values as the constructors
type Config struct {
Data map[string]interface{} `mapstructure:"kv"`
}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 5f4d98b1..4b675271 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -9,10 +9,10 @@ import (
"time"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
)
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 28e2a89c..6ae1a1f6 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -46,7 +46,7 @@ func (s *Plugin) Stop() error {
return nil
}
-func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index c1f79cbb..a2787d72 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,10 +6,10 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
type Driver struct {
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 936b2047..22ea5cca 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -34,7 +34,7 @@ func (s *Plugin) Name() string {
// Available interface implementation
func (s *Plugin) Available() {}
-func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 5aedd5c3..ffdbbe62 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,6 +1,6 @@
package kv
-import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
+import kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -29,13 +29,8 @@ type Storage interface {
Delete(keys ...string) error
}
-// StorageDriver interface provide storage
-type StorageDriver interface {
- Provider
-}
-
-// Provider provides storage based on the config
-type Provider interface {
- // Provide provides Storage based on the config key
- KVProvide(key string) (Storage, error)
+// Constructor provides storage based on the config
+type Constructor interface {
+ // KVConstruct provides Storage based on the config key
+ KVConstruct(key string) (Storage, error)
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index efe92252..03dbaed6 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -24,8 +24,8 @@ const (
// Plugin for the unified storage
type Plugin struct {
log logger.Logger
- // drivers contains general storage drivers, such as boltdb, memory, memcached, redis.
- drivers map[string]StorageDriver
+ // constructors contains general storage constructors, such as boltdb, memory, memcached, redis.
+ constructors map[string]Constructor
// storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
storages map[string]Storage
// KV configuration
@@ -43,7 +43,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, err)
}
- p.drivers = make(map[string]StorageDriver, 5)
+ p.constructors = make(map[string]Constructor, 5)
p.storages = make(map[string]Storage, 5)
p.log = log
p.cfgPlugin = cfg
@@ -81,13 +81,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
addr: [ "localhost:11211" ]
- For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
when user requests for example boltdb-south, we should provide that particular preconfigured storage
*/
for k, v := range p.cfg.Data {
- if _, ok := v.(map[string]interface{})[driver]; !ok {
- errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
- return errCh
+ // for example if the key not properly formatted (yaml)
+ if v == nil {
+ continue
+ }
+
+ // check type of the v
+ // should be a map[string]interface{}
+ switch t := v.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ return errCh
+ }
+ default:
+ p.log.Warn("wrong type detected in the configuration, please, check yaml indentation")
+ continue
}
// config key for the particular sub-driver kv.memcached
@@ -95,12 +109,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// at this point we know, that driver field present in the configuration
switch v.(map[string]interface{})[driver] {
case memcached:
- if _, ok := p.drivers[memcached]; !ok {
- p.log.Warn("no memcached drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[memcached]; !ok {
+ p.log.Warn("no memcached constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[memcached].KVProvide(configKey)
+ storage, err := p.constructors[memcached].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -110,12 +124,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.storages[k] = storage
case boltdb:
- if _, ok := p.drivers[boltdb]; !ok {
- p.log.Warn("no boltdb drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[boltdb]; !ok {
+ p.log.Warn("no boltdb constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[boltdb].KVProvide(configKey)
+ storage, err := p.constructors[boltdb].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -124,12 +138,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case memory:
- if _, ok := p.drivers[memory]; !ok {
- p.log.Warn("no in-memory drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[memory]; !ok {
+ p.log.Warn("no in-memory constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[memory].KVProvide(configKey)
+ storage, err := p.constructors[memory].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -138,15 +152,15 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case redis:
- if _, ok := p.drivers[redis]; !ok {
- p.log.Warn("no redis drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[redis]; !ok {
+ p.log.Warn("no redis constructors registered", "registered", p.constructors)
continue
}
// first - try local configuration
switch {
case p.cfgPlugin.Has(configKey):
- storage, err := p.drivers[redis].KVProvide(configKey)
+ storage, err := p.constructors[redis].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -155,7 +169,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case p.cfgPlugin.Has(redis):
- storage, err := p.drivers[redis].KVProvide(configKey)
+ storage, err := p.constructors[redis].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -189,9 +203,9 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) {
- // save the storage driver
- p.drivers[name.Name()] = storage
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) {
+ // save the storage constructor
+ p.constructors[name.Name()] = constructor
}
// RPC returns associated rpc service.
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index ab1f7f31..af763600 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,8 +2,8 @@ package kv
import (
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
// Wrapper for the plugin
diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go
index 9b7d7259..1cf031d1 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/kv.go
@@ -6,10 +6,10 @@ import (
"time"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
type Driver struct {
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 6151ebf0..70badf15 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -41,11 +41,11 @@ func (p *Plugin) Stop() error {
return nil
}
-func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
return NewPubSubDriver(p.log, key)
}
-func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop)
if err != nil {
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index 75cd9d24..d027a8a5 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -4,16 +4,14 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
)
type PubSubDriver struct {
sync.RWMutex
// channel with the messages from the RPC
- pushCh chan []byte
+ pushCh chan *pubsub.Message
// user-subscribed topics
storage bst.Storage
log logger.Logger
@@ -21,21 +19,21 @@ type PubSubDriver struct {
func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) {
ps := &PubSubDriver{
- pushCh: make(chan []byte, 10),
+ pushCh: make(chan *pubsub.Message, 10),
storage: bst.NewBST(),
log: log,
}
return ps, nil
}
-func (p *PubSubDriver) Publish(message []byte) error {
- p.pushCh <- message
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
+ p.pushCh <- msg
return nil
}
-func (p *PubSubDriver) PublishAsync(message []byte) {
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
go func() {
- p.pushCh <- message
+ p.pushCh <- msg
}()
}
@@ -67,7 +65,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
+func (p *PubSubDriver) Next() (*pubsub.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
@@ -76,20 +74,13 @@ func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
p.RLock()
defer p.RUnlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return nil, err
- }
-
- // push only messages, which are subscribed
+ // push only messages, which topics are subscibed
// TODO better???
- for i := 0; i < len(m.GetTopics()); i++ {
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(m.GetTopics()[i]); ok {
- return m, nil
- }
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
}
+
return nil, nil
}
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go
new file mode 100644
index 00000000..5817853c
--- /dev/null
+++ b/plugins/redis/channel.go
@@ -0,0 +1,97 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type redisChannel struct {
+ sync.Mutex
+
+ // redis client
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan *pubsub.Message
+
+ exit chan struct{}
+}
+
+func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel {
+ out := make(chan *pubsub.Message, 100)
+ fi := &redisChannel{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (r *redisChannel) sub(topics ...string) error {
+ const op = errors.Op("redis_sub")
+ err := r.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (r *redisChannel) read() {
+ for {
+ select {
+ // here we receive message from us (which we sent before in Publish)
+ // it should be compatible with the pubsub.Message structure
+ // payload should be in the redis.message.payload field
+
+ case msg, ok := <-r.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+
+ r.out <- &pubsub.Message{
+ Topic: msg.Channel,
+ Payload: utils.AsBytes(msg.Payload),
+ }
+
+ case <-r.exit:
+ return
+ }
+ }
+}
+
+func (r *redisChannel) unsub(topic string) error {
+ const op = errors.Op("redis_unsub")
+ err := r.pubsub.Unsubscribe(context.Background(), topic)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (r *redisChannel) stop() error {
+ r.exit <- struct{}{}
+ close(r.out)
+ close(r.exit)
+ return nil
+}
+
+func (r *redisChannel) message() *pubsub.Message {
+ return <-r.out
+}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
deleted file mode 100644
index ac9ebcc2..00000000
--- a/plugins/redis/fanin.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package redis
-
-import (
- "context"
- "sync"
-
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type FanIn struct {
- sync.Mutex
-
- // redis client
- client redis.UniversalClient
- pubsub *redis.PubSub
-
- log logger.Logger
-
- // out channel with all subs
- out chan *websocketsv1.Message
-
- exit chan struct{}
-}
-
-func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *websocketsv1.Message, 100)
- fi := &FanIn{
- out: out,
- client: redisClient,
- pubsub: redisClient.Subscribe(context.Background()),
- exit: make(chan struct{}),
- log: log,
- }
-
- // start reading messages
- go fi.read()
-
- return fi
-}
-
-func (fi *FanIn) sub(topics ...string) error {
- const op = errors.Op("fanin_addchannel")
- err := fi.pubsub.Subscribe(context.Background(), topics...)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-// read reads messages from the pubsub subscription
-func (fi *FanIn) read() {
- for {
- select {
- // here we receive message from us (which we sent before in Publish)
- // it should be compatible with the websockets.Msg interface
- // payload should be in the redis.message.payload field
-
- case msg, ok := <-fi.pubsub.Channel():
- // channel closed
- if !ok {
- return
- }
-
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
- if err != nil {
- fi.log.Error("message unmarshal")
- continue
- }
-
- fi.out <- m
- case <-fi.exit:
- return
- }
- }
-}
-
-func (fi *FanIn) unsub(topic string) error {
- const op = errors.Op("fanin_remove")
- err := fi.pubsub.Unsubscribe(context.Background(), topic)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (fi *FanIn) stop() error {
- fi.exit <- struct{}{}
- close(fi.out)
- close(fi.exit)
- return nil
-}
-
-func (fi *FanIn) consume() <-chan *websocketsv1.Message {
- return fi.out
-}
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 66cb8384..320b7443 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -7,10 +7,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
)
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 24c21b55..9d98790b 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -59,8 +59,8 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-// KVProvide provides KV storage implementation over the redis plugin
-func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
+// KVConstruct provides KV storage implementation over the redis plugin
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("redis_plugin_provide")
st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
@@ -70,6 +70,6 @@ func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
return st, nil
}
-func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index dbda7ea4..4e41acb5 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,11 +6,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
)
type PubSubDriver struct {
@@ -18,7 +16,7 @@ type PubSubDriver struct {
cfg *Config `mapstructure:"redis"`
log logger.Logger
- fanin *FanIn
+ channel *redisChannel
universalClient redis.UniversalClient
stopCh chan struct{}
}
@@ -62,7 +60,12 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
MasterName: ps.cfg.MasterName,
})
- ps.fanin = newFanIn(ps.universalClient, log)
+ statusCmd := ps.universalClient.Ping(context.Background())
+ if statusCmd.Err() != nil {
+ return nil, statusCmd.Err()
+ }
+
+ ps.channel = newRedisChannel(ps.universalClient, log)
ps.stop()
@@ -72,47 +75,32 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
func (p *PubSubDriver) stop() {
go func() {
for range p.stopCh {
- _ = p.fanin.stop()
+ _ = p.channel.stop()
return
}
}()
}
-func (p *PubSubDriver) Publish(msg []byte) error {
+func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return errors.E(err)
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ return f.Err()
}
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- return f.Err()
- }
- }
return nil
}
-func (p *PubSubDriver) PublishAsync(msg []byte) {
+func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
go func() {
p.Lock()
defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- p.log.Error("message unmarshal error")
- return
- }
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- p.log.Error("redis publish", "error", f.Err())
- }
+ f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
+ if f.Err() != nil {
+ p.log.Error("redis publish", "error", f.Err())
}
}()
}
@@ -128,13 +116,13 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
return err
}
if res == 0 {
- p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i])
+ p.log.Warn("could not subscribe to the provided topic, you might be already subscribed to it", "connectionID", connectionID, "topic", topics[i])
continue
}
}
// and subscribe after
- return p.fanin.sub(topics...)
+ return p.channel.sub(topics...)
}
func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
@@ -160,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error
}
// else - unsubscribe
- err = p.fanin.unsub(topics[i])
+ err = p.channel.unsub(topics[i])
if err != nil {
return err
}
@@ -176,7 +164,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
panic(err)
}
- // assighn connections
+ // assign connections
// res expected to be from the sync.Pool
for k := range r {
res[k] = struct{}{}
@@ -184,6 +172,6 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
- return <-p.fanin.consume(), nil
+func (p *PubSubDriver) Next() (*pubsub.Message, error) {
+ return p.channel.message(), nil
}
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index deb4406c..933a12e0 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -4,80 +4,42 @@ import (
"strings"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
)
/*
-# GLOBAL
-redis:
- addrs:
- - 'localhost:6379'
-
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
-
- pubsubs:["redis", "amqp", "memory"]
- # OR local
- redis:
- addrs:
- - 'localhost:6379'
-
- # path used as websockets path
+ broker: default
+ allowed_origin: "*"
path: "/ws"
*/
-type RedisConfig struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
- Path string `mapstructure:"path"`
- // ["redis", "amqp", "memory"]
- PubSubs []string `mapstructure:"pubsubs"`
- Middleware []string `mapstructure:"middleware"`
-
+ Path string `mapstructure:"path"`
AllowedOrigin string `mapstructure:"allowed_origin"`
+ Broker string `mapstructure:"broker"`
// wildcard origin
allowedWOrigins []wildcard
allowedOrigins []string
allowedAll bool
- Redis *RedisConfig `mapstructure:"redis"`
- Pool *pool.Config `mapstructure:"pool"`
+ // Pool with the workers for the websockets
+ Pool *pool.Config `mapstructure:"pool"`
}
// InitDefault initialize default values for the ws config
-func (c *Config) InitDefault() {
+func (c *Config) InitDefault() error {
if c.Path == "" {
c.Path = "/ws"
}
- if len(c.PubSubs) == 0 {
- // memory used by default
- c.PubSubs = append(c.PubSubs, "memory")
+ // broker is mandatory
+ if c.Broker == "" {
+ return errors.Str("broker key should be specified")
}
if c.Pool == nil {
@@ -99,13 +61,6 @@ func (c *Config) InitDefault() {
}
}
- if c.Redis != nil {
- if c.Redis.Addrs == nil {
- // append default
- c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379")
- }
- }
-
if c.AllowedOrigin == "" {
c.AllowedOrigin = "*"
}
@@ -115,7 +70,7 @@ func (c *Config) InitDefault() {
if origin == "*" {
// If "*" is present in the list, turn the whole list into a match all
c.allowedAll = true
- return
+ return nil
} else if i := strings.IndexByte(origin, '*'); i >= 0 {
// Split the origin in two: start and end string without the *
w := wildcard{origin[0:i], origin[i+1:]}
@@ -123,4 +78,6 @@ func (c *Config) InitDefault() {
} else {
c.allowedOrigins = append(c.allowedOrigins, origin)
}
+
+ return nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 5f904d26..664b4dfd 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,12 +7,12 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
)
type Response struct {
@@ -22,14 +22,15 @@ type Response struct {
type Executor struct {
sync.Mutex
+ // raw ws connection
conn *connection.Connection
log logger.Logger
// associated connection ID
connID string
- // map with the pubsub drivers
- pubsub map[string]pubsub.PubSub
+ // subscriber drivers
+ sub pubsub.Subscriber
actualTopics map[string]struct{}
req *http.Request
@@ -38,12 +39,12 @@ type Executor struct {
// NewExecutor creates protected connection and starts command loop
func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
log: log,
- pubsub: pubsubs,
+ sub: sub,
accessValidator: av,
actualTopics: make(map[string]struct{}, 10),
req: r,
@@ -67,20 +68,20 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
err = json.Unmarshal(data, msg)
if err != nil {
- e.log.Error("error unmarshal message", "error", err)
+ e.log.Error("unmarshal message", "error", err)
continue
}
// nil message, continue
if msg == nil {
- e.log.Warn("get nil message, skipping")
+ e.log.Warn("nil message, skipping")
continue
}
switch msg.Command {
// handle leave
case commands.Join:
- e.log.Debug("get join command", "msg", msg)
+ e.log.Debug("received join command", "msg", msg)
val, err := e.accessValidator(e.req, msg.Topics...)
if err != nil {
@@ -95,13 +96,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, errJ := json.Marshal(resp)
if errJ != nil {
- e.log.Error("error marshal the body", "error", errJ)
+ e.log.Error("marshal the body", "error", errJ)
return errors.E(op, fmt.Errorf("%v,%v", err, errJ))
}
errW := e.conn.Write(packet)
if errW != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", errW)
+ e.log.Error("write payload to the connection", "payload", packet, "error", errW)
return errors.E(op, fmt.Errorf("%v,%v", err, errW))
}
@@ -115,27 +116,25 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, err := json.Marshal(resp)
if err != nil {
- e.log.Error("error marshal the body", "error", err)
+ e.log.Error("marshal the body", "error", err)
return errors.E(op, err)
}
err = e.conn.Write(packet)
if err != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ e.log.Error("write payload to the connection", "payload", packet, "error", err)
return errors.E(op, err)
}
// subscribe to the topic
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Set(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Set(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
// handle leave
case commands.Leave:
- e.log.Debug("get leave command", "msg", msg)
+ e.log.Debug("received leave command", "msg", msg)
// prepare response
resp := &Response{
@@ -145,21 +144,19 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, err := json.Marshal(resp)
if err != nil {
- e.log.Error("error marshal the body", "error", err)
+ e.log.Error("marshal the body", "error", err)
return errors.E(op, err)
}
err = e.conn.Write(packet)
if err != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ e.log.Error("write payload to the connection", "payload", packet, "error", err)
return errors.E(op, err)
}
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Leave(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Leave(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
case commands.Headers:
@@ -170,13 +167,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
}
-func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Set(topics []string) error {
// associate connection with topics
- err := br.Subscribe(e.connID, topics...)
+ err := e.sub.Subscribe(e.connID, topics...)
if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
+ e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
- _ = br.Unsubscribe(e.connID, topics...)
+ _ = e.sub.Unsubscribe(e.connID, topics...)
return err
}
@@ -188,11 +185,11 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
return nil
}
-func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Leave(topics []string) error {
// remove associated connections from the storage
- err := br.Unsubscribe(e.connID, topics...)
+ err := e.sub.Unsubscribe(e.connID, topics...)
if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
+ e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error())
return err
}
@@ -207,10 +204,7 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
func (e *Executor) CleanUp() {
// unsubscribe particular connection from the topics
for topic := range e.actualTopics {
- // here
- for _, ps := range e.pubsub {
- _ = ps.Unsubscribe(e.connID, topic)
- }
+ _ = e.sub.Unsubscribe(e.connID, topic)
}
// clean up the actualTopics data
diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go
index e877fad3..bbc49bbb 100644
--- a/plugins/websockets/origin_test.go
+++ b/plugins/websockets/origin_test.go
@@ -9,9 +9,11 @@ import (
func TestConfig_Origin(t *testing.T) {
cfg := &Config{
AllowedOrigin: "*",
+ Broker: "any",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("http://some.some.some.sssome", cfg))
assert.True(t, isOriginAllowed("http://", cfg))
@@ -27,9 +29,11 @@ func TestConfig_Origin(t *testing.T) {
func TestConfig_OriginWildCard(t *testing.T) {
cfg := &Config{
AllowedOrigin: "https://*my.site.com",
+ Broker: "any",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("https://my.site.com", cfg))
assert.False(t, isOriginAllowed("http://", cfg))
@@ -48,9 +52,11 @@ func TestConfig_OriginWildCard(t *testing.T) {
func TestConfig_OriginWildCard2(t *testing.T) {
cfg := &Config{
AllowedOrigin: "https://my.*.com",
+ Broker: "any",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("https://my.site.com", cfg))
assert.False(t, isOriginAllowed("http://", cfg))
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 8b708187..ca5f2f59 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,7 +2,6 @@ package websockets
import (
"context"
- "fmt"
"net/http"
"sync"
"time"
@@ -10,14 +9,13 @@ import (
"github.com/fasthttp/websocket"
"github.com/google/uuid"
json "github.com/json-iterator/go"
- endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -26,7 +24,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "google.golang.org/protobuf/proto"
)
const (
@@ -35,14 +32,14 @@ const (
type Plugin struct {
sync.RWMutex
- // Collection with all available pubsubs
- pubsubs map[string]pubsub.PubSub
- psProviders map[string]pubsub.PSProvider
+ // subscriber+reader interfaces
+ subReader pubsub.SubReader
+ // broadcaster
+ broadcaster broadcast.Broadcaster
- cfg *Config
- cfgPlugin config.Configurer
- log logger.Logger
+ cfg *Config
+ log logger.Logger
// global connections map
connections sync.Map
@@ -53,14 +50,16 @@ type Plugin struct {
wsUpgrade *websocket.Upgrader
serveExit chan struct{}
+ // workers pool
phpPool phpPool.Pool
- server server.Server
+ // server which produces commands to the pool
+ server server.Server
// function used to validate access to the requested resource
accessValidator validator.AccessValidatorFn
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error {
const op = errors.Op("websockets_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -71,12 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- p.cfg.InitDefault()
- p.pubsubs = make(map[string]pubsub.PubSub)
- p.psProviders = make(map[string]pubsub.PSProvider)
-
- p.log = log
- p.cfgPlugin = cfg
+ err = p.cfg.InitDefault()
+ if err != nil {
+ return errors.E(op, err)
+ }
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
@@ -88,15 +85,17 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.serveExit = make(chan struct{})
p.server = server
-
+ p.log = log
+ p.broadcaster = b
return nil
}
func (p *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
const op = errors.Op("websockets_plugin_serve")
-
- err := p.initPubSubs()
+ errCh := make(chan error, 1)
+ // init broadcaster
+ var err error
+ p.subReader, err = p.broadcaster.GetDriver(p.cfg.Broker)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -122,76 +121,26 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
- p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
-
- // run all pubsubs drivers
- for _, v := range p.pubsubs {
- go func(ps pubsub.PubSub) {
- for {
- select {
- case <-p.serveExit:
- return
- default:
- data, err := ps.Next()
- if err != nil {
- errCh <- err
- return
- }
- p.workersPool.Queue(data)
- }
- }
- }(v)
- }
-
- return errCh
-}
-
-func (p *Plugin) initPubSubs() error {
- for i := 0; i < len(p.cfg.PubSubs); i++ {
- // don't need to have a section for the in-memory
- if p.cfg.PubSubs[i] == "memory" {
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- r, err := provider.PSProvide("")
- if err != nil {
- return err
- }
+ p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log)
- // append default in-memory provider
- p.pubsubs["memory"] = r
- }
- continue
- }
- // key - memory, redis
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- // try local key
- switch {
- // try local config first
- case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
- r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
- r, err := provider.PSProvide(p.cfg.PubSubs[i])
+ // we need here only Reader part of the interface
+ go func(ps pubsub.Reader) {
+ for {
+ select {
+ case <-p.serveExit:
+ return
+ default:
+ data, err := ps.Next()
if err != nil {
- return err
+ errCh <- err
+ return
}
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- default:
- return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+ p.workersPool.Queue(data)
}
- } else {
- // no such driver
- p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
}
- }
+ }(p.subReader)
- return nil
+ return errCh
}
func (p *Plugin) Stop() error {
@@ -208,30 +157,12 @@ func (p *Plugin) Stop() error {
return nil
}
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.GetPublishers,
- }
-}
-
func (p *Plugin) Available() {}
-func (p *Plugin) RPC() interface{} {
- return &rpc{
- plugin: p,
- log: p.log,
- }
-}
-
func (p *Plugin) Name() string {
return PluginName
}
-// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
- p.psProviders[name.Name()] = pub
-}
-
func (p *Plugin) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != p.cfg.Path {
@@ -277,7 +208,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Store(connectionID, safeConn)
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, p.subReader, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
err = e.StartCommandLoop()
@@ -361,55 +292,6 @@ func (p *Plugin) Reset() error {
return nil
}
-// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(m []byte) error {
- p.Lock()
- defer p.Unlock()
-
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- return err
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- return nil
-}
-
-func (p *Plugin) PublishAsync(m []byte) {
- go func() {
- p.Lock()
- defer p.Unlock()
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- p.log.Error("message unmarshal")
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- }()
-}
-
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
const op = errors.Op("access_validator")
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index a196d1f0..752ba3ce 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,7 +4,6 @@ import (
"sync"
json "github.com/json-iterator/go"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -12,21 +11,21 @@ import (
)
type WorkersPool struct {
- storage map[string]pubsub.PubSub
+ subscriber pubsub.Subscriber
connections *sync.Map
resPool sync.Pool
log logger.Logger
- queue chan *websocketsv1.Message
+ queue chan *pubsub.Message
exit chan struct{}
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *websocketsv1.Message, 100),
- storage: pubsubs,
+ queue: make(chan *pubsub.Message, 100),
+ subscriber: subscriber,
log: log,
exit: make(chan struct{}),
}
@@ -43,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
return wp
}
-func (wp *WorkersPool) Queue(msg *websocketsv1.Message) {
+func (wp *WorkersPool) Queue(msg *pubsub.Message) {
wp.queue <- msg
}
@@ -83,63 +82,48 @@ func (wp *WorkersPool) do() { //nolint:gocognit
return
}
_ = msg
- if msg == nil {
- continue
- }
- if len(msg.GetTopics()) == 0 {
+ if msg == nil || msg.Topic == "" {
continue
}
- br, ok := wp.storage[msg.Broker]
- if !ok {
- wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
+ // get free map
+ res := wp.get()
+
+ // get connections for the particular topic
+ wp.subscriber.Connections(msg.Topic, res)
+
+ if len(res) == 0 {
+ wp.log.Info("no connections associated with provided topic", "topic", msg.Topic)
+ wp.put(res)
continue
}
- // send a message to every topic
- for i := 0; i < len(msg.GetTopics()); i++ {
- // get free map
- res := wp.get()
+ // res is a map with a connectionsID
+ for connID := range res {
+ c, ok := wp.connections.Load(connID)
+ if !ok {
+ wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.Topic)
+ wp.put(res)
+ continue
+ }
- // get connections for the particular topic
- br.Connections(msg.GetTopics()[i], res)
+ d, err := json.Marshal(&Response{
+ Topic: msg.Topic,
+ Payload: utils.AsString(msg.Payload),
+ })
- if len(res) == 0 {
- wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
+ if err != nil {
+ wp.log.Error("error marshaling response", "error", err)
wp.put(res)
- continue
+ break
}
- // res is a map with a connectionsID
- for topic := range res {
- c, ok := wp.connections.Load(topic)
- if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
- wp.put(res)
- continue
- }
-
- response := &Response{
- Topic: msg.GetTopics()[i],
- Payload: utils.AsString(msg.GetPayload()),
- }
-
- d, err := json.Marshal(response)
- if err != nil {
- wp.log.Error("error marshaling response", "error", err)
- wp.put(res)
- break
- }
-
- // put data into the bytes buffer
- err = c.(*connection.Connection).Write(d)
- if err != nil {
- for i := 0; i < len(msg.GetTopics()); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
- }
- wp.put(res)
- continue
- }
+ // put data into the bytes buffer
+ err = c.(*connection.Connection).Write(d)
+ if err != nil {
+ wp.log.Error("error sending payload over the connection", "error", err, "topic", msg.Topic)
+ wp.put(res)
+ continue
}
}
case <-wp.exit:
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
deleted file mode 100644
index 341e0b2a..00000000
--- a/plugins/websockets/rpc.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package websockets
-
-import (
- "github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-)
-
-// rpc collectors struct
-type rpc struct {
- plugin *Plugin
- log logger.Logger
-}
-
-// Publish ... msg is a proto decoded payload
-// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("broadcast_publish")
-
- // just return in case of nil message
- if in == nil {
- out.Ok = false
- return nil
- }
-
- r.log.Debug("message published", "msg", in.Messages)
-
- msgLen := len(in.GetMessages())
-
- for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- return errors.E(op, err)
- }
-
- err = r.plugin.Publish(bb)
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
- }
-
- out.Ok = true
- return nil
-}
-
-// PublishAsync ...
-// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("publish_async")
-
- // just return in case of nil message
- if in == nil {
- out.Ok = false
- return nil
- }
-
- r.log.Debug("message published", "msg", in.Messages)
-
- msgLen := len(in.GetMessages())
-
- for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
-
- r.plugin.PublishAsync(bb)
- }
-
- out.Ok = true
- return nil
-}