diff options
Diffstat (limited to 'plugins')
27 files changed, 641 insertions, 582 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go new file mode 100644 index 00000000..4f1e5213 --- /dev/null +++ b/plugins/broadcast/config.go @@ -0,0 +1,25 @@ +package broadcast + +/* + +# Global redis config (priority - 2) + +websockets: # <----- one of possible subscribers + path: /ws + broker: default # <------ broadcast broker to use --------------- | + | match +broadcast: # <-------- broadcast entry point plugin | + default: # <----------------------------------------------------- | + driver: redis + # local redis config (priority - 1) + test: + driver: memory + + +priority local -> global +*/ + +// Config ... +type Config struct { + Data map[string]interface{} `mapstructure:"broadcast"` +} diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio new file mode 100644 index 00000000..fd5ff1f9 --- /dev/null +++ b/plugins/broadcast/doc/broadcast_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-06-18T09:34:25.915Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="THNfOcV33EQGG0gzo1UK" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1bc6M4Fv41rk1vVVIg7o+Jk8l01fR2Np7e7n7a4iLbbDB4AMdJ//qVQGCQZBsHEMSTviRGIAznfj4dSRNlunq5j+318kvkwWACJO9lotxOALBkE/3EDa95AzAVJW9ZxL6Xt8m7hpn/C5JGibRufA8mtQvTKApSf11vdKMwhG5aa7PjONrWL5tHQf1b1/YCMg0z1w7Y1u++ly7Ja0iStDvxO/QXy5Q+s7KLq0lDsrS9aFtpUu4myjSOojT/tHqZwgBTryBM3u+3PWfLJ4thmDbpMFV1/fPNfXhp/FAuZ+Hz8kd0f6mr+W2e7WBDXpk8bfpa0CCONqEH8V2kiXKzXfopnK1tF5/dIq6jtmW6CtCRjD7O/SCYRkEUZ30Vz4bm3EXtSRpHT7ByRndN6MzRGfY9yKs9wziFL5Um8l73MFrBNH5Fl5CzikloTMRM1cjxtsazvG1ZZZdFGm0iJ4vy3jtKog+EmCcQVtP7Jexcw3+5hM3+4B5RmFba8z/dEFyW5DrFDYOluGpyKK7oWl8U186b4qBO8ctSoAckec/WY2iSU0KuSNrQFLcY+kIPOSxyGMXpMlpEoR3c7Vpv6hzYXfNHFK0J3f8H0/SVeF97k0Z1rsAXP/2Bu19p5OgnuRn+fPtSPXgtDkL0upVO+PBncT98sOuWHRX99vItiTaxCw+QpggP7HgB00PXEYZhwh0UgxgGduo/1yMBHkdJ14fIR89cio8BpCtZBaZmkJ81WdIkyvPkz03uQUlJ+VBvF5xCIoeSHFAVHbmh6FQFpyJHe0SHNh6mC11u6OGYGo4QRAhbEV91LGzXcWy/Vi5YY7FJ9suibmo16dNVKlakr5fbXW/Kh69XJKnV9ZqlUUqSU6RTlSkcxs653Tx+vb6dXs/+7NbLtRDU5t5M1ygSciI2WeU4M62vEFkd0iLJFXO082unGSRwqkWazwGf0Z7u6JreyiKBpu6va4PUSgYAo2OPd7efZ6jp4dvN7NtNt4o2h/oe+huWI3WkaCoVqJcRYlXRgEhFG9T1d6FoY9Iz5V3qmcLo2Ze7L18ff56TonE9mlBFM967ohlj0jT9XWoai/tdf/n3w0T5Df87I23T9aG1zWQojV49wKMA6Muy7vaz7Qe2g4iMXn7jJBsHffBiJANxwrAA0SGt09kO/EWIPruIahCR9AZTy3ft4JqcWPmelysxTPxf2RflDCT5GLqvdjPRbvG9kN4muQrLDKvCKIT9IOGaynKJj4T3xSVZGdIojhyykpqauHHZuOK5q6lxHNmeaydY+XysLnNsvbo0c54GTU/lmTkTOIp+0NeckCbrb4ze1d4USP9QoGN6cVyBtK4V6E04nEE50WL8cR/uZRlmq+v1I9fLsiUd6tAPsKaBYSXaaC7SAkRT7dy2v000DaUmCbJ5ouhQHXoSnUGjiXqKZTTNsUZnDYE1CpEzKQlSSay4T+To62VZO1VGqR49DRywIwcToAc4u1jXZFf/a4Orf25WiGE+Siiu0Vlp/YJ+ZnIg5e2XKRZTfE6tnMPpyiXJUPA5kqSU90SfFuS3vcKRVcAeVQI10lzvUhxlFztx5ZZ5S9FQBn/ZA5Av+s2hr0dta7ptydy1OxKRcwGcp8fpl13FUO8eprdZvnjxBF+zB4v9cPEJfZptnEdoe7U3rrwdZaSSpb3GH+OsSK1qWyJ083mQFYbhgHdPWNwh7nJCLEyN6imclL+s1KvGwkZvsfBH/cPREPe49RdUAMFYX42GgnqueZDZirDv0Eki9wmmLP7TJjd1ZM+bSzx9lCVDsToCd2RZoRFv44otSeKmp0pfFUmywRD5c+inFxhqK/3CJ4baZ4G2MUV5OuCgBWLhNhYVfYQuxFoMJKcRSHOmrOEgobIukjWlJX9PuUvVdzUYH3q7+yrIftx9AUHuC9BYuiJdYYDd0sjPRs7s1KRILqMnCsLfm+SwHUxKVnvIccCg1TvjjsSai3I/qCQ3EruSJUVVTWBopg7qUI9Kz4LoOS4r6FPxUV+3Ye6fnnBCJSUoTfKjMHNTk2x2z9xfnEXEJnO8kNDhBDDseFzNC4GGXkiue6GjdXctVBc0Vd3Ok6h2XAWMShW4hOc/F7DEdpf9YMyjxC0ql3B6lXBQ6CTrHDwhipoBJx6c25sgbXM7OwiiLfT+G8V+CfLssJh/1mCZN3/JOpuQV781us82OX77NnaniJ1zFGogM6RQM1lUi5M28op/e0NywMeo5tGw4LgRMkTFD6Z1tZvIohbfW+CCouMHFtcpkYarHWJbGilioTBqe0lj21Uc9/2FFxJV1K8qbHgBuDPUetNrFg6aIXHAwgRfoIs0EhF6momk7aFfK5gk9gIiIywFWIvfIRPoGE/hIQ1iYzzzw7jupY3V0Lgq/UzdajBIWc47FWVPLUZnv6bLLBF7CDaLLAVjsavx13IxmolS4OEBc4XNfv++gLksWRxjKRQxV9jR+g/EPFcfHlghFDJXwJCObOyQeeGhjrsyXZAroyFzyxoEMpeLVXgaY+aohwDQXPmohe9AmE1xcdkB0NwSXMxQ0GdI0NyTXAgBL2zTdE02+kqorKYT+3pLqJRhV8+o+aHxgeZK05l63adU7bjKW6OLgZLzVKg7vDyFSQpa3GxfNaY40FqQGaBBa0sbGrRWBp3gO3L/bTY1Av0Un58KWpui/TdbmDU20FqQXtOgtcmBAMSC1goLgM1g/Nzx/MVhgieTs0yg2OCpuPGH1dw/+ev4LDFRKTyNRtNi0bOZVFk0ro5Fs4hQq7UTbM/yuCtM2lCVFdCTUurDw9AqYAj9N4Khafhn8LptlV2h5wOFrkX4g4HQ6ogWqhsfCK02Tf5VUbgdDUKDOgYtm2ojj9YahVZOxKCJ69t7PVPEVbu+H8RaHXjZ2FHHbk0lv8h3BkasL2kL2Xcox8JrwhFr0fHdcEnWsNDUyBFqtSk41X2a1Y6rLGi0D6HOs6NprkLdotXyKTcbHJ0WpPK0Ny5nGx9Fp3vL6D4mxx91wcd9dT+r8ZyKTl/Kgp11QaARw9OCFJuGpy85K8CJhac1FhPrHp4eCAm7lAcf3NfPvipH5M4YmtHQ0o5rSUuNnbfw+DCt4237YptkbYdFWwnVXT1snMBPlhdkhsOnSiRU7dCpEkPZ06DB46ylG4rd1bpASn1dIIM3xGQIVeFBl1Y4SYXRwQOMffTe2I8OFQ41zYe0zmeYtdNSNh/aJJwwo1hua7MKrl085WgHff9hOzB4iBI/Qx6UWydK02g1YbHxbEmx2kpdmzTwQ6RUxVaTB3nQXJ0Mido8h7N7ocLRJr03bWKBmxkMvUk5WwtLToS/Gf0nZo7DhLMYj1AokNbgrQfMs3W9DUforKd6iKNn38Nsqa3ElzMpqg2fnum65zq1QRRoulRdF2y6MR7+9f3PG/mv6eNqMdV/RL/Pvg27FwS131rjqBIICSspZ3LUhXHpyymc4F5ntvRX/PRdo5YJlzXaGneXr/Nff9ApnvVByaZ4b0v5aryXsFD5kkUFRNxvx/1HY2aabuvYKnkdqRSALszMqUPOjBUy9GNjzvLhHq1HkQ9Rkb9dRJcYVlMPxAQWHOnYG2to9KxijbeaSl8QFt8MMATO8dpMAhc+IodNBloTdwlX9miDwFaMAQxjOJlUb7VDXMaMqHRoXFvvdWKdOXvx7bc/Q7loTmEftcUljSt+Xq0DuIKZ6cf59Szbt2oC8NDKw2wahYi4mwzW6NR4Nt1GrGMd5S0A0Nc+tFwGsasJkX3EkvzbjhH9LGwnjXPw+SLUdrIoVIUP6ISLQXRpXWIfXuHyWI1y7ARi/CpzgekS7vrh1rzuaFP6yGyp/8ZMJlUI42cxoDASWee4Ry7S2MXIJpfFA1csGG/KXoxa+rJziSN0kByYf3wOskhJPoKkYWWgn0IXnV7dSpOkq7w8VdJNC7+uSNyMHUCa3T3+544ThIX+biPRfPNQHAqgmMpd4vYYcirnkjV0/bmfeRV/52v6KGwVEq0xqa7atFqji8Epfqo78GzCt/gMqtD11F2WBUNejTGvLuzFyXuGGSplTo7sAWape0CBxh10st5ovxAZZxUyKk2ULrBN8lHc+SszMHY2JJvg1TWh94m1YLOyuv4Cvtg4p/xErdjcxh4dWSKiqci2Mk/0yLms6iwSB4rlYrteIoLPx/e4rrHRBpBvXHPUjXUCY7ZOLL5ebJu+19rQ4dHJHQz5cIdyT6I9HXqyZywk3dPeW43lr5WpKXeO3PkRkXN++DRmK0Hu8wnDq9qO93MMWpFQdIXfqZh3ddAbnAWspZu0M+dVc/Pgxt5grWIPwyreuITuU4Uv58kKpmTCZJ21JZITgI25fuKVyHHZ72f8EHamTOnST6rMmWbPh/M/KTvxinzWZAc/niXvaKejS5xAS6gaAd6CCekmxvEuqUvMx03KAm4cLa+L9awvYuhl3CNpes5iEiWfJQ8NqmpbNjgrKyhCa+SKTcRqQy+VgRYphDCH6HN9w5rnQNfeJFj5thnoH4X/wMHOUxhtJyyUb6el5yOADsqXFvCKzZZKedn62YM7uI/9bPtBxjxSTOnkcE42PJQ9mPNafsFd6G0yqxCFQRY0zvMqzOoXZxxLEuhdNVnY7xzFkAlfAWtJVN440xsGIdBhHOGdfXaRLYoml18iD+Ir/g8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go new file mode 100644 index 00000000..46709d71 --- /dev/null +++ b/plugins/broadcast/interface.go @@ -0,0 +1,7 @@ +package broadcast + +import "github.com/spiral/roadrunner/v2/pkg/pubsub" + +type Broadcaster interface { + GetDriver(key string) (pubsub.SubReader, error) +} diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go new file mode 100644 index 00000000..6ddef806 --- /dev/null +++ b/plugins/broadcast/plugin.go @@ -0,0 +1,208 @@ +package broadcast + +import ( + "fmt" + "sync" + + "github.com/google/uuid" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "broadcast" + // driver is the mandatory field which should present in every storage + driver string = "driver" + + redis string = "redis" + memory string = "memory" +) + +type Plugin struct { + sync.RWMutex + + cfg *Config + cfgPlugin config.Configurer + log logger.Logger + // publishers implement Publisher interface + // and able to receive a payload + publishers map[string]pubsub.PubSub + constructors map[string]pubsub.Constructor +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("broadcast_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + p.cfg = &Config{} + // unmarshal config section + err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) + if err != nil { + return errors.E(op, err) + } + + p.publishers = make(map[string]pubsub.PubSub) + p.constructors = make(map[string]pubsub.Constructor) + + p.log = log + p.cfgPlugin = cfg + return nil +} + +func (p *Plugin) Serve() chan error { + return make(chan error) +} + +func (p *Plugin) Stop() error { + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectPublishers, + } +} + +// CollectPublishers collect all plugins who implement pubsub.Publisher interface +func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) { + // key redis, value - interface + p.constructors[name.Name()] = constructor +} + +// Publish is an entry point to the websocket PUBSUB +func (p *Plugin) Publish(m *pubsub.Message) error { + p.Lock() + defer p.Unlock() + + const op = errors.Op("broadcast_plugin_publish") + + // check if any publisher registered + if len(p.publishers) > 0 { + for j := range p.publishers { + err := p.publishers[j].Publish(m) + if err != nil { + return errors.E(op, err) + } + } + return nil + } else { + p.log.Warn("no publishers registered") + } + + return nil +} + +func (p *Plugin) PublishAsync(m *pubsub.Message) { + go func() { + p.Lock() + defer p.Unlock() + // check if any publisher registered + if len(p.publishers) > 0 { + for j := range p.publishers { + err := p.publishers[j].Publish(m) + if err != nil { + p.log.Error("publishAsync", "error", err) + // continue publish to other registered publishers + continue + } + } + } else { + p.log.Warn("no publishers registered") + } + }() +} + +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit + const op = errors.Op("broadcast_plugin_get_driver") + + // choose a driver + if val, ok := p.cfg.Data[key]; ok { + // check type of the v + // should be a map[string]interface{} + switch t := val.(type) { + // correct type + case map[string]interface{}: + if _, ok := t[driver]; !ok { + panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val))) + } + default: + return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation")) + } + + // config key for the particular sub-driver kv.memcached + configKey := fmt.Sprintf("%s.%s", PluginName, key) + + switch val.(map[string]interface{})[driver] { + case memory: + if _, ok := p.constructors[memory]; !ok { + return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers)) + } + ps, err := p.constructors[memory].PSConstruct(configKey) + if err != nil { + return nil, errors.E(op, err) + } + + // save the initialized publisher channel + // for the in-memory, register new publishers + p.publishers[uuid.NewString()] = ps + + return ps, nil + case redis: + if _, ok := p.constructors[redis]; !ok { + return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers)) + } + + // first - try local configuration + switch { + case p.cfgPlugin.Has(configKey): + ps, err := p.constructors[redis].PSConstruct(configKey) + if err != nil { + return nil, errors.E(op, err) + } + + // if section already exists, return new connection + if _, ok := p.publishers[configKey]; ok { + return ps, nil + } + + // if not - initialize a connection + p.publishers[configKey] = ps + return ps, nil + + // then try global if local does not exist + case p.cfgPlugin.Has(redis): + ps, err := p.constructors[redis].PSConstruct(configKey) + if err != nil { + return nil, errors.E(op, err) + } + + // if section already exists, return new connection + if _, ok := p.publishers[configKey]; ok { + return ps, nil + } + + // if not - initialize a connection + p.publishers[configKey] = ps + return ps, nil + } + } + } + return nil, errors.E(op, errors.Str("could not find driver by provided key")) +} + +func (p *Plugin) RPC() interface{} { + return &rpc{ + plugin: p, + log: p.log, + } +} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go new file mode 100644 index 00000000..2ee211f8 --- /dev/null +++ b/plugins/broadcast/rpc.go @@ -0,0 +1,87 @@ +package broadcast + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" +) + +// rpc collectors struct +type rpc struct { + plugin *Plugin + log logger.Logger +} + +// Publish ... msg is a proto decoded payload +// see: root/proto +func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { + const op = errors.Op("broadcast_publish") + + // just return in case of nil message + if in == nil { + out.Ok = false + return nil + } + + r.log.Debug("message published", "msg", in.String()) + msgLen := len(in.GetMessages()) + + for i := 0; i < msgLen; i++ { + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } + + err := r.plugin.Publish(tmp) + if err != nil { + out.Ok = false + return errors.E(op, err) + } + } + } + + out.Ok = true + return nil +} + +// PublishAsync ... +// see: root/proto +func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { + // just return in case of nil message + if in == nil { + out.Ok = false + return nil + } + + r.log.Debug("message published", "msg", in.GetMessages()) + + msgLen := len(in.GetMessages()) + + for i := 0; i < msgLen; i++ { + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } + + r.plugin.PublishAsync(tmp) + } + } + + out.Ok = true + return nil +} diff --git a/plugins/kv/config.go b/plugins/kv/config.go index 66095817..09ba79cd 100644 --- a/plugins/kv/config.go +++ b/plugins/kv/config.go @@ -1,6 +1,6 @@ package kv -// Config represents general storage configuration with keys as the user defined kv-names and values as the drivers +// Config represents general storage configuration with keys as the user defined kv-names and values as the constructors type Config struct { Data map[string]interface{} `mapstructure:"kv"` } diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 5f4d98b1..4b675271 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -9,10 +9,10 @@ import ( "time" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" ) diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 28e2a89c..6ae1a1f6 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -46,7 +46,7 @@ func (s *Plugin) Stop() error { return nil } -func (s *Plugin) KVProvide(key string) (kv.Storage, error) { +func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index c1f79cbb..a2787d72 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,10 +6,10 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) type Driver struct { diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 936b2047..22ea5cca 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -34,7 +34,7 @@ func (s *Plugin) Name() string { // Available interface implementation func (s *Plugin) Available() {} -func (s *Plugin) KVProvide(key string) (kv.Storage, error) { +func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 5aedd5c3..ffdbbe62 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,6 +1,6 @@ package kv -import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" +import kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" // Storage represents single abstract storage. type Storage interface { @@ -29,13 +29,8 @@ type Storage interface { Delete(keys ...string) error } -// StorageDriver interface provide storage -type StorageDriver interface { - Provider -} - -// Provider provides storage based on the config -type Provider interface { - // Provide provides Storage based on the config key - KVProvide(key string) (Storage, error) +// Constructor provides storage based on the config +type Constructor interface { + // KVConstruct provides Storage based on the config key + KVConstruct(key string) (Storage, error) } diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index efe92252..03dbaed6 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -24,8 +24,8 @@ const ( // Plugin for the unified storage type Plugin struct { log logger.Logger - // drivers contains general storage drivers, such as boltdb, memory, memcached, redis. - drivers map[string]StorageDriver + // constructors contains general storage constructors, such as boltdb, memory, memcached, redis. + constructors map[string]Constructor // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. storages map[string]Storage // KV configuration @@ -43,7 +43,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, err) } - p.drivers = make(map[string]StorageDriver, 5) + p.constructors = make(map[string]Constructor, 5) p.storages = make(map[string]Storage, 5) p.log = log p.cfgPlugin = cfg @@ -81,13 +81,27 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit addr: [ "localhost:11211" ] - For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached + For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached when user requests for example boltdb-south, we should provide that particular preconfigured storage */ for k, v := range p.cfg.Data { - if _, ok := v.(map[string]interface{})[driver]; !ok { - errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) - return errCh + // for example if the key not properly formatted (yaml) + if v == nil { + continue + } + + // check type of the v + // should be a map[string]interface{} + switch t := v.(type) { + // correct type + case map[string]interface{}: + if _, ok := t[driver]; !ok { + errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) + return errCh + } + default: + p.log.Warn("wrong type detected in the configuration, please, check yaml indentation") + continue } // config key for the particular sub-driver kv.memcached @@ -95,12 +109,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // at this point we know, that driver field present in the configuration switch v.(map[string]interface{})[driver] { case memcached: - if _, ok := p.drivers[memcached]; !ok { - p.log.Warn("no memcached drivers registered", "registered", p.drivers) + if _, ok := p.constructors[memcached]; !ok { + p.log.Warn("no memcached constructors registered", "registered", p.constructors) continue } - storage, err := p.drivers[memcached].KVProvide(configKey) + storage, err := p.constructors[memcached].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -110,12 +124,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.storages[k] = storage case boltdb: - if _, ok := p.drivers[boltdb]; !ok { - p.log.Warn("no boltdb drivers registered", "registered", p.drivers) + if _, ok := p.constructors[boltdb]; !ok { + p.log.Warn("no boltdb constructors registered", "registered", p.constructors) continue } - storage, err := p.drivers[boltdb].KVProvide(configKey) + storage, err := p.constructors[boltdb].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -124,12 +138,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage case memory: - if _, ok := p.drivers[memory]; !ok { - p.log.Warn("no in-memory drivers registered", "registered", p.drivers) + if _, ok := p.constructors[memory]; !ok { + p.log.Warn("no in-memory constructors registered", "registered", p.constructors) continue } - storage, err := p.drivers[memory].KVProvide(configKey) + storage, err := p.constructors[memory].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -138,15 +152,15 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage case redis: - if _, ok := p.drivers[redis]; !ok { - p.log.Warn("no redis drivers registered", "registered", p.drivers) + if _, ok := p.constructors[redis]; !ok { + p.log.Warn("no redis constructors registered", "registered", p.constructors) continue } // first - try local configuration switch { case p.cfgPlugin.Has(configKey): - storage, err := p.drivers[redis].KVProvide(configKey) + storage, err := p.constructors[redis].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -155,7 +169,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage case p.cfgPlugin.Has(redis): - storage, err := p.drivers[redis].KVProvide(configKey) + storage, err := p.constructors[redis].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -189,9 +203,9 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) { - // save the storage driver - p.drivers[name.Name()] = storage +func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) { + // save the storage constructor + p.constructors[name.Name()] = constructor } // RPC returns associated rpc service. diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index ab1f7f31..af763600 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,8 +2,8 @@ package kv import ( "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) // Wrapper for the plugin diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go index 9b7d7259..1cf031d1 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/kv.go @@ -6,10 +6,10 @@ import ( "time" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) type Driver struct { diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 6151ebf0..70badf15 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -41,11 +41,11 @@ func (p *Plugin) Stop() error { return nil } -func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) { +func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { return NewPubSubDriver(p.log, key) } -func (p *Plugin) KVProvide(key string) (kv.Storage, error) { +func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) if err != nil { diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index 75cd9d24..d027a8a5 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -4,16 +4,14 @@ import ( "sync" "github.com/spiral/roadrunner/v2/pkg/bst" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" ) type PubSubDriver struct { sync.RWMutex // channel with the messages from the RPC - pushCh chan []byte + pushCh chan *pubsub.Message // user-subscribed topics storage bst.Storage log logger.Logger @@ -21,21 +19,21 @@ type PubSubDriver struct { func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) { ps := &PubSubDriver{ - pushCh: make(chan []byte, 10), + pushCh: make(chan *pubsub.Message, 10), storage: bst.NewBST(), log: log, } return ps, nil } -func (p *PubSubDriver) Publish(message []byte) error { - p.pushCh <- message +func (p *PubSubDriver) Publish(msg *pubsub.Message) error { + p.pushCh <- msg return nil } -func (p *PubSubDriver) PublishAsync(message []byte) { +func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { go func() { - p.pushCh <- message + p.pushCh <- msg }() } @@ -67,7 +65,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } } -func (p *PubSubDriver) Next() (*websocketsv1.Message, error) { +func (p *PubSubDriver) Next() (*pubsub.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil @@ -76,20 +74,13 @@ func (p *PubSubDriver) Next() (*websocketsv1.Message, error) { p.RLock() defer p.RUnlock() - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - return nil, err - } - - // push only messages, which are subscribed + // push only messages, which topics are subscibed // TODO better??? - for i := 0; i < len(m.GetTopics()); i++ { - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(m.GetTopics()[i]); ok { - return m, nil - } + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(msg.Topic); ok { + return msg, nil } + return nil, nil } diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go new file mode 100644 index 00000000..5817853c --- /dev/null +++ b/plugins/redis/channel.go @@ -0,0 +1,97 @@ +package redis + +import ( + "context" + "sync" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +type redisChannel struct { + sync.Mutex + + // redis client + client redis.UniversalClient + pubsub *redis.PubSub + + log logger.Logger + + // out channel with all subs + out chan *pubsub.Message + + exit chan struct{} +} + +func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { + out := make(chan *pubsub.Message, 100) + fi := &redisChannel{ + out: out, + client: redisClient, + pubsub: redisClient.Subscribe(context.Background()), + exit: make(chan struct{}), + log: log, + } + + // start reading messages + go fi.read() + + return fi +} + +func (r *redisChannel) sub(topics ...string) error { + const op = errors.Op("redis_sub") + err := r.pubsub.Subscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +// read reads messages from the pubsub subscription +func (r *redisChannel) read() { + for { + select { + // here we receive message from us (which we sent before in Publish) + // it should be compatible with the pubsub.Message structure + // payload should be in the redis.message.payload field + + case msg, ok := <-r.pubsub.Channel(): + // channel closed + if !ok { + return + } + + r.out <- &pubsub.Message{ + Topic: msg.Channel, + Payload: utils.AsBytes(msg.Payload), + } + + case <-r.exit: + return + } + } +} + +func (r *redisChannel) unsub(topic string) error { + const op = errors.Op("redis_unsub") + err := r.pubsub.Unsubscribe(context.Background(), topic) + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (r *redisChannel) stop() error { + r.exit <- struct{}{} + close(r.out) + close(r.exit) + return nil +} + +func (r *redisChannel) message() *pubsub.Message { + return <-r.out +} diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go deleted file mode 100644 index ac9ebcc2..00000000 --- a/plugins/redis/fanin.go +++ /dev/null @@ -1,102 +0,0 @@ -package redis - -import ( - "context" - "sync" - - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/utils" -) - -type FanIn struct { - sync.Mutex - - // redis client - client redis.UniversalClient - pubsub *redis.PubSub - - log logger.Logger - - // out channel with all subs - out chan *websocketsv1.Message - - exit chan struct{} -} - -func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *websocketsv1.Message, 100) - fi := &FanIn{ - out: out, - client: redisClient, - pubsub: redisClient.Subscribe(context.Background()), - exit: make(chan struct{}), - log: log, - } - - // start reading messages - go fi.read() - - return fi -} - -func (fi *FanIn) sub(topics ...string) error { - const op = errors.Op("fanin_addchannel") - err := fi.pubsub.Subscribe(context.Background(), topics...) - if err != nil { - return errors.E(op, err) - } - return nil -} - -// read reads messages from the pubsub subscription -func (fi *FanIn) read() { - for { - select { - // here we receive message from us (which we sent before in Publish) - // it should be compatible with the websockets.Msg interface - // payload should be in the redis.message.payload field - - case msg, ok := <-fi.pubsub.Channel(): - // channel closed - if !ok { - return - } - - m := &websocketsv1.Message{} - err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) - if err != nil { - fi.log.Error("message unmarshal") - continue - } - - fi.out <- m - case <-fi.exit: - return - } - } -} - -func (fi *FanIn) unsub(topic string) error { - const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topic) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (fi *FanIn) stop() error { - fi.exit <- struct{}{} - close(fi.out) - close(fi.exit) - return nil -} - -func (fi *FanIn) consume() <-chan *websocketsv1.Message { - return fi.out -} diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go index 66cb8384..320b7443 100644 --- a/plugins/redis/kv.go +++ b/plugins/redis/kv.go @@ -7,10 +7,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" ) diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 24c21b55..9d98790b 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -59,8 +59,8 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -// KVProvide provides KV storage implementation over the redis plugin -func (p *Plugin) KVProvide(key string) (kv.Storage, error) { +// KVConstruct provides KV storage implementation over the redis plugin +func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("redis_plugin_provide") st, err := NewRedisDriver(p.log, key, p.cfgPlugin) if err != nil { @@ -70,6 +70,6 @@ func (p *Plugin) KVProvide(key string) (kv.Storage, error) { return st, nil } -func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) { +func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh) } diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index dbda7ea4..4e41acb5 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -6,11 +6,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" ) type PubSubDriver struct { @@ -18,7 +16,7 @@ type PubSubDriver struct { cfg *Config `mapstructure:"redis"` log logger.Logger - fanin *FanIn + channel *redisChannel universalClient redis.UniversalClient stopCh chan struct{} } @@ -62,7 +60,12 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, MasterName: ps.cfg.MasterName, }) - ps.fanin = newFanIn(ps.universalClient, log) + statusCmd := ps.universalClient.Ping(context.Background()) + if statusCmd.Err() != nil { + return nil, statusCmd.Err() + } + + ps.channel = newRedisChannel(ps.universalClient, log) ps.stop() @@ -72,47 +75,32 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, func (p *PubSubDriver) stop() { go func() { for range p.stopCh { - _ = p.fanin.stop() + _ = p.channel.stop() return } }() } -func (p *PubSubDriver) Publish(msg []byte) error { +func (p *PubSubDriver) Publish(msg *pubsub.Message) error { p.Lock() defer p.Unlock() - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - return errors.E(err) + f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload) + if f.Err() != nil { + return f.Err() } - for j := 0; j < len(m.GetTopics()); j++ { - f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) - if f.Err() != nil { - return f.Err() - } - } return nil } -func (p *PubSubDriver) PublishAsync(msg []byte) { +func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { go func() { p.Lock() defer p.Unlock() - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - p.log.Error("message unmarshal error") - return - } - for j := 0; j < len(m.GetTopics()); j++ { - f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) - if f.Err() != nil { - p.log.Error("redis publish", "error", f.Err()) - } + f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload) + if f.Err() != nil { + p.log.Error("redis publish", "error", f.Err()) } }() } @@ -128,13 +116,13 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { return err } if res == 0 { - p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i]) + p.log.Warn("could not subscribe to the provided topic, you might be already subscribed to it", "connectionID", connectionID, "topic", topics[i]) continue } } // and subscribe after - return p.fanin.sub(topics...) + return p.channel.sub(topics...) } func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { @@ -160,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error } // else - unsubscribe - err = p.fanin.unsub(topics[i]) + err = p.channel.unsub(topics[i]) if err != nil { return err } @@ -176,7 +164,7 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { panic(err) } - // assighn connections + // assign connections // res expected to be from the sync.Pool for k := range r { res[k] = struct{}{} @@ -184,6 +172,6 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *PubSubDriver) Next() (*websocketsv1.Message, error) { - return <-p.fanin.consume(), nil +func (p *PubSubDriver) Next() (*pubsub.Message, error) { + return p.channel.message(), nil } diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go index deb4406c..933a12e0 100644 --- a/plugins/websockets/config.go +++ b/plugins/websockets/config.go @@ -4,80 +4,42 @@ import ( "strings" "time" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pool" ) /* -# GLOBAL -redis: - addrs: - - 'localhost:6379' - websockets: - # pubsubs should implement PubSub interface to be collected via endure.Collects - - pubsubs:["redis", "amqp", "memory"] - # OR local - redis: - addrs: - - 'localhost:6379' - - # path used as websockets path + broker: default + allowed_origin: "*" path: "/ws" */ -type RedisConfig struct { - Addrs []string `mapstructure:"addrs"` - DB int `mapstructure:"db"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - MasterName string `mapstructure:"master_name"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - MaxRetries int `mapstructure:"max_retries"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` - MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge time.Duration `mapstructure:"max_conn_age"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` - PoolTimeout time.Duration `mapstructure:"pool_timeout"` - IdleTimeout time.Duration `mapstructure:"idle_timeout"` - IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` -} - // Config represents configuration for the ws plugin type Config struct { // http path for the websocket - Path string `mapstructure:"path"` - // ["redis", "amqp", "memory"] - PubSubs []string `mapstructure:"pubsubs"` - Middleware []string `mapstructure:"middleware"` - + Path string `mapstructure:"path"` AllowedOrigin string `mapstructure:"allowed_origin"` + Broker string `mapstructure:"broker"` // wildcard origin allowedWOrigins []wildcard allowedOrigins []string allowedAll bool - Redis *RedisConfig `mapstructure:"redis"` - Pool *pool.Config `mapstructure:"pool"` + // Pool with the workers for the websockets + Pool *pool.Config `mapstructure:"pool"` } // InitDefault initialize default values for the ws config -func (c *Config) InitDefault() { +func (c *Config) InitDefault() error { if c.Path == "" { c.Path = "/ws" } - if len(c.PubSubs) == 0 { - // memory used by default - c.PubSubs = append(c.PubSubs, "memory") + // broker is mandatory + if c.Broker == "" { + return errors.Str("broker key should be specified") } if c.Pool == nil { @@ -99,13 +61,6 @@ func (c *Config) InitDefault() { } } - if c.Redis != nil { - if c.Redis.Addrs == nil { - // append default - c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379") - } - } - if c.AllowedOrigin == "" { c.AllowedOrigin = "*" } @@ -115,7 +70,7 @@ func (c *Config) InitDefault() { if origin == "*" { // If "*" is present in the list, turn the whole list into a match all c.allowedAll = true - return + return nil } else if i := strings.IndexByte(origin, '*'); i >= 0 { // Split the origin in two: start and end string without the * w := wildcard{origin[0:i], origin[i+1:]} @@ -123,4 +78,6 @@ func (c *Config) InitDefault() { } else { c.allowedOrigins = append(c.allowedOrigins, origin) } + + return nil } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 5f904d26..664b4dfd 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,12 +7,12 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" ) type Response struct { @@ -22,14 +22,15 @@ type Response struct { type Executor struct { sync.Mutex + // raw ws connection conn *connection.Connection log logger.Logger // associated connection ID connID string - // map with the pubsub drivers - pubsub map[string]pubsub.PubSub + // subscriber drivers + sub pubsub.Subscriber actualTopics map[string]struct{} req *http.Request @@ -38,12 +39,12 @@ type Executor struct { // NewExecutor creates protected connection and starts command loop func NewExecutor(conn *connection.Connection, log logger.Logger, - connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor { + connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor { return &Executor{ conn: conn, connID: connID, log: log, - pubsub: pubsubs, + sub: sub, accessValidator: av, actualTopics: make(map[string]struct{}, 10), req: r, @@ -67,20 +68,20 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit err = json.Unmarshal(data, msg) if err != nil { - e.log.Error("error unmarshal message", "error", err) + e.log.Error("unmarshal message", "error", err) continue } // nil message, continue if msg == nil { - e.log.Warn("get nil message, skipping") + e.log.Warn("nil message, skipping") continue } switch msg.Command { // handle leave case commands.Join: - e.log.Debug("get join command", "msg", msg) + e.log.Debug("received join command", "msg", msg) val, err := e.accessValidator(e.req, msg.Topics...) if err != nil { @@ -95,13 +96,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, errJ := json.Marshal(resp) if errJ != nil { - e.log.Error("error marshal the body", "error", errJ) + e.log.Error("marshal the body", "error", errJ) return errors.E(op, fmt.Errorf("%v,%v", err, errJ)) } errW := e.conn.Write(packet) if errW != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", errW) + e.log.Error("write payload to the connection", "payload", packet, "error", errW) return errors.E(op, fmt.Errorf("%v,%v", err, errW)) } @@ -115,27 +116,25 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, err := json.Marshal(resp) if err != nil { - e.log.Error("error marshal the body", "error", err) + e.log.Error("marshal the body", "error", err) return errors.E(op, err) } err = e.conn.Write(packet) if err != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + e.log.Error("write payload to the connection", "payload", packet, "error", err) return errors.E(op, err) } // subscribe to the topic - if br, ok := e.pubsub[msg.Broker]; ok { - err = e.Set(br, msg.Topics) - if err != nil { - return errors.E(op, err) - } + err = e.Set(msg.Topics) + if err != nil { + return errors.E(op, err) } // handle leave case commands.Leave: - e.log.Debug("get leave command", "msg", msg) + e.log.Debug("received leave command", "msg", msg) // prepare response resp := &Response{ @@ -145,21 +144,19 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, err := json.Marshal(resp) if err != nil { - e.log.Error("error marshal the body", "error", err) + e.log.Error("marshal the body", "error", err) return errors.E(op, err) } err = e.conn.Write(packet) if err != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + e.log.Error("write payload to the connection", "payload", packet, "error", err) return errors.E(op, err) } - if br, ok := e.pubsub[msg.Broker]; ok { - err = e.Leave(br, msg.Topics) - if err != nil { - return errors.E(op, err) - } + err = e.Leave(msg.Topics) + if err != nil { + return errors.E(op, err) } case commands.Headers: @@ -170,13 +167,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit } } -func (e *Executor) Set(br pubsub.PubSub, topics []string) error { +func (e *Executor) Set(topics []string) error { // associate connection with topics - err := br.Subscribe(e.connID, topics...) + err := e.sub.Subscribe(e.connID, topics...) if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics - _ = br.Unsubscribe(e.connID, topics...) + _ = e.sub.Unsubscribe(e.connID, topics...) return err } @@ -188,11 +185,11 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error { return nil } -func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { +func (e *Executor) Leave(topics []string) error { // remove associated connections from the storage - err := br.Unsubscribe(e.connID, topics...) + err := e.sub.Unsubscribe(e.connID, topics...) if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) return err } @@ -207,10 +204,7 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { func (e *Executor) CleanUp() { // unsubscribe particular connection from the topics for topic := range e.actualTopics { - // here - for _, ps := range e.pubsub { - _ = ps.Unsubscribe(e.connID, topic) - } + _ = e.sub.Unsubscribe(e.connID, topic) } // clean up the actualTopics data diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go index e877fad3..bbc49bbb 100644 --- a/plugins/websockets/origin_test.go +++ b/plugins/websockets/origin_test.go @@ -9,9 +9,11 @@ import ( func TestConfig_Origin(t *testing.T) { cfg := &Config{ AllowedOrigin: "*", + Broker: "any", } - cfg.InitDefault() + err := cfg.InitDefault() + assert.NoError(t, err) assert.True(t, isOriginAllowed("http://some.some.some.sssome", cfg)) assert.True(t, isOriginAllowed("http://", cfg)) @@ -27,9 +29,11 @@ func TestConfig_Origin(t *testing.T) { func TestConfig_OriginWildCard(t *testing.T) { cfg := &Config{ AllowedOrigin: "https://*my.site.com", + Broker: "any", } - cfg.InitDefault() + err := cfg.InitDefault() + assert.NoError(t, err) assert.True(t, isOriginAllowed("https://my.site.com", cfg)) assert.False(t, isOriginAllowed("http://", cfg)) @@ -48,9 +52,11 @@ func TestConfig_OriginWildCard(t *testing.T) { func TestConfig_OriginWildCard2(t *testing.T) { cfg := &Config{ AllowedOrigin: "https://my.*.com", + Broker: "any", } - cfg.InitDefault() + err := cfg.InitDefault() + assert.NoError(t, err) assert.True(t, isOriginAllowed("https://my.site.com", cfg)) assert.False(t, isOriginAllowed("http://", cfg)) diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 8b708187..ca5f2f59 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -2,7 +2,6 @@ package websockets import ( "context" - "fmt" "net/http" "sync" "time" @@ -10,14 +9,13 @@ import ( "github.com/fasthttp/websocket" "github.com/google/uuid" json "github.com/json-iterator/go" - endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -26,7 +24,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/executor" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" - "google.golang.org/protobuf/proto" ) const ( @@ -35,14 +32,14 @@ const ( type Plugin struct { sync.RWMutex - // Collection with all available pubsubs - pubsubs map[string]pubsub.PubSub - psProviders map[string]pubsub.PSProvider + // subscriber+reader interfaces + subReader pubsub.SubReader + // broadcaster + broadcaster broadcast.Broadcaster - cfg *Config - cfgPlugin config.Configurer - log logger.Logger + cfg *Config + log logger.Logger // global connections map connections sync.Map @@ -53,14 +50,16 @@ type Plugin struct { wsUpgrade *websocket.Upgrader serveExit chan struct{} + // workers pool phpPool phpPool.Pool - server server.Server + // server which produces commands to the pool + server server.Server // function used to validate access to the requested resource accessValidator validator.AccessValidatorFn } -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error { const op = errors.Op("websockets_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -71,12 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return errors.E(op, err) } - p.cfg.InitDefault() - p.pubsubs = make(map[string]pubsub.PubSub) - p.psProviders = make(map[string]pubsub.PSProvider) - - p.log = log - p.cfgPlugin = cfg + err = p.cfg.InitDefault() + if err != nil { + return errors.E(op, err) + } p.wsUpgrade = &websocket.Upgrader{ HandshakeTimeout: time.Second * 60, @@ -88,15 +85,17 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } p.serveExit = make(chan struct{}) p.server = server - + p.log = log + p.broadcaster = b return nil } func (p *Plugin) Serve() chan error { - errCh := make(chan error, 1) const op = errors.Op("websockets_plugin_serve") - - err := p.initPubSubs() + errCh := make(chan error, 1) + // init broadcaster + var err error + p.subReader, err = p.broadcaster.GetDriver(p.cfg.Broker) if err != nil { errCh <- errors.E(op, err) return errCh @@ -122,76 +121,26 @@ func (p *Plugin) Serve() chan error { p.accessValidator = p.defaultAccessValidator(p.phpPool) }() - p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log) - - // run all pubsubs drivers - for _, v := range p.pubsubs { - go func(ps pubsub.PubSub) { - for { - select { - case <-p.serveExit: - return - default: - data, err := ps.Next() - if err != nil { - errCh <- err - return - } - p.workersPool.Queue(data) - } - } - }(v) - } - - return errCh -} - -func (p *Plugin) initPubSubs() error { - for i := 0; i < len(p.cfg.PubSubs); i++ { - // don't need to have a section for the in-memory - if p.cfg.PubSubs[i] == "memory" { - if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { - r, err := provider.PSProvide("") - if err != nil { - return err - } + p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log) - // append default in-memory provider - p.pubsubs["memory"] = r - } - continue - } - // key - memory, redis - if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { - // try local key - switch { - // try local config first - case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])): - r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])) - if err != nil { - return err - } - - // append redis provider - p.pubsubs[p.cfg.PubSubs[i]] = r - case p.cfgPlugin.Has(p.cfg.PubSubs[i]): - r, err := provider.PSProvide(p.cfg.PubSubs[i]) + // we need here only Reader part of the interface + go func(ps pubsub.Reader) { + for { + select { + case <-p.serveExit: + return + default: + data, err := ps.Next() if err != nil { - return err + errCh <- err + return } - - // append redis provider - p.pubsubs[p.cfg.PubSubs[i]] = r - default: - return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i]) + p.workersPool.Queue(data) } - } else { - // no such driver - p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders) } - } + }(p.subReader) - return nil + return errCh } func (p *Plugin) Stop() error { @@ -208,30 +157,12 @@ func (p *Plugin) Stop() error { return nil } -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.GetPublishers, - } -} - func (p *Plugin) Available() {} -func (p *Plugin) RPC() interface{} { - return &rpc{ - plugin: p, - log: p.log, - } -} - func (p *Plugin) Name() string { return PluginName } -// GetPublishers collects all pubsubs -func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) { - p.psProviders[name.Name()] = pub -} - func (p *Plugin) Middleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != p.cfg.Path { @@ -277,7 +208,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { p.connections.Store(connectionID, safeConn) // Executor wraps a connection to have a safe abstraction - e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r) + e := executor.NewExecutor(safeConn, p.log, connectionID, p.subReader, p.accessValidator, r) p.log.Info("websocket client connected", "uuid", connectionID) err = e.StartCommandLoop() @@ -361,55 +292,6 @@ func (p *Plugin) Reset() error { return nil } -// Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(m []byte) error { - p.Lock() - defer p.Unlock() - - msg := &websocketsv1.Message{} - err := proto.Unmarshal(m, msg) - if err != nil { - return err - } - - // Get payload - for i := 0; i < len(msg.GetTopics()); i++ { - if br, ok := p.pubsubs[msg.GetBroker()]; ok { - err := br.Publish(m) - if err != nil { - return errors.E(err) - } - } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) - } - } - return nil -} - -func (p *Plugin) PublishAsync(m []byte) { - go func() { - p.Lock() - defer p.Unlock() - msg := &websocketsv1.Message{} - err := proto.Unmarshal(m, msg) - if err != nil { - p.log.Error("message unmarshal") - } - - // Get payload - for i := 0; i < len(msg.GetTopics()); i++ { - if br, ok := p.pubsubs[msg.GetBroker()]; ok { - err := br.Publish(m) - if err != nil { - p.log.Error("publish async error", "error", err) - } - } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) - } - } - }() -} - func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) { const op = errors.Op("access_validator") diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index a196d1f0..752ba3ce 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,7 +4,6 @@ import ( "sync" json "github.com/json-iterator/go" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" @@ -12,21 +11,21 @@ import ( ) type WorkersPool struct { - storage map[string]pubsub.PubSub + subscriber pubsub.Subscriber connections *sync.Map resPool sync.Pool log logger.Logger - queue chan *websocketsv1.Message + queue chan *pubsub.Message exit chan struct{} } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *websocketsv1.Message, 100), - storage: pubsubs, + queue: make(chan *pubsub.Message, 100), + subscriber: subscriber, log: log, exit: make(chan struct{}), } @@ -43,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log return wp } -func (wp *WorkersPool) Queue(msg *websocketsv1.Message) { +func (wp *WorkersPool) Queue(msg *pubsub.Message) { wp.queue <- msg } @@ -83,63 +82,48 @@ func (wp *WorkersPool) do() { //nolint:gocognit return } _ = msg - if msg == nil { - continue - } - if len(msg.GetTopics()) == 0 { + if msg == nil || msg.Topic == "" { continue } - br, ok := wp.storage[msg.Broker] - if !ok { - wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) + // get free map + res := wp.get() + + // get connections for the particular topic + wp.subscriber.Connections(msg.Topic, res) + + if len(res) == 0 { + wp.log.Info("no connections associated with provided topic", "topic", msg.Topic) + wp.put(res) continue } - // send a message to every topic - for i := 0; i < len(msg.GetTopics()); i++ { - // get free map - res := wp.get() + // res is a map with a connectionsID + for connID := range res { + c, ok := wp.connections.Load(connID) + if !ok { + wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.Topic) + wp.put(res) + continue + } - // get connections for the particular topic - br.Connections(msg.GetTopics()[i], res) + d, err := json.Marshal(&Response{ + Topic: msg.Topic, + Payload: utils.AsString(msg.Payload), + }) - if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) + if err != nil { + wp.log.Error("error marshaling response", "error", err) wp.put(res) - continue + break } - // res is a map with a connectionsID - for topic := range res { - c, ok := wp.connections.Load(topic) - if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) - wp.put(res) - continue - } - - response := &Response{ - Topic: msg.GetTopics()[i], - Payload: utils.AsString(msg.GetPayload()), - } - - d, err := json.Marshal(response) - if err != nil { - wp.log.Error("error marshaling response", "error", err) - wp.put(res) - break - } - - // put data into the bytes buffer - err = c.(*connection.Connection).Write(d) - if err != nil { - for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) - } - wp.put(res) - continue - } + // put data into the bytes buffer + err = c.(*connection.Connection).Write(d) + if err != nil { + wp.log.Error("error sending payload over the connection", "error", err, "topic", msg.Topic) + wp.put(res) + continue } } case <-wp.exit: diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go deleted file mode 100644 index 341e0b2a..00000000 --- a/plugins/websockets/rpc.go +++ /dev/null @@ -1,75 +0,0 @@ -package websockets - -import ( - "github.com/spiral/errors" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" -) - -// rpc collectors struct -type rpc struct { - plugin *Plugin - log logger.Logger -} - -// Publish ... msg is a proto decoded payload -// see: pkg/pubsub/message.fbs -func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { - const op = errors.Op("broadcast_publish") - - // just return in case of nil message - if in == nil { - out.Ok = false - return nil - } - - r.log.Debug("message published", "msg", in.Messages) - - msgLen := len(in.GetMessages()) - - for i := 0; i < msgLen; i++ { - bb, err := proto.Marshal(in.GetMessages()[i]) - if err != nil { - return errors.E(op, err) - } - - err = r.plugin.Publish(bb) - if err != nil { - out.Ok = false - return errors.E(op, err) - } - } - - out.Ok = true - return nil -} - -// PublishAsync ... -// see: pkg/pubsub/message.fbs -func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { - const op = errors.Op("publish_async") - - // just return in case of nil message - if in == nil { - out.Ok = false - return nil - } - - r.log.Debug("message published", "msg", in.Messages) - - msgLen := len(in.GetMessages()) - - for i := 0; i < msgLen; i++ { - bb, err := proto.Marshal(in.GetMessages()[i]) - if err != nil { - out.Ok = false - return errors.E(op, err) - } - - r.plugin.PublishAsync(bb) - } - - out.Ok = true - return nil -} |