diff options
author | Valery Piashchynski <[email protected]> | 2021-05-18 15:36:06 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-18 15:36:06 +0300 |
commit | f85172106b4723b705aa75c3c310e8cebd050a8d (patch) | |
tree | aa98200d5d01776d4328c31c79b8af52986a4375 /plugins | |
parent | 60a3793f83c672e24283c009f53e780a4932be50 (diff) |
- Add protected connection
- Update arch diagram
- Update interfaces
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/broadcast/config.go | 16 | ||||
-rw-r--r-- | plugins/broadcast/interface.go | 3 | ||||
-rw-r--r-- | plugins/broadcast/ws/commands/join.go | 10 | ||||
-rw-r--r-- | plugins/broadcast/ws/config.go | 25 | ||||
-rw-r--r-- | plugins/broadcast/ws/connection.go | 69 | ||||
-rw-r--r-- | plugins/broadcast/ws/plugin.go | 71 | ||||
-rw-r--r-- | plugins/broadcast/ws/subscriber.go | 34 | ||||
-rw-r--r-- | plugins/config/interface.go | 4 | ||||
-rw-r--r-- | plugins/kv/storage.go | 2 |
9 files changed, 230 insertions, 4 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go index 92f7951d..03bf6510 100644 --- a/plugins/broadcast/config.go +++ b/plugins/broadcast/config.go @@ -1,6 +1,22 @@ package broadcast +/* +broadcast: + ws-us-region-1: + subscriber: ws + path: "/ws" + + driver: redis + address: + - 6379 + db: 0 +*/ + + +// Config represents configuration for the ws plugin type Config struct { + // Sections represent particular broadcast plugin section + Sections map[string]interface{} `mapstructure:"sections"` } func (c *Config) InitDefaults() { diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go index 716b3aac..3ed8b412 100644 --- a/plugins/broadcast/interface.go +++ b/plugins/broadcast/interface.go @@ -18,7 +18,8 @@ type Subscriber interface { } type Storage interface { - Store() + Store(topics ...string) + StorePattern(pattern string) } type Publisher interface { diff --git a/plugins/broadcast/ws/commands/join.go b/plugins/broadcast/ws/commands/join.go new file mode 100644 index 00000000..25943f0a --- /dev/null +++ b/plugins/broadcast/ws/commands/join.go @@ -0,0 +1,10 @@ +package commands + +// Join command to save the connection +type Join struct { + Command string `mapstructure:"command"` +} + +func JoinCommand() { + +} diff --git a/plugins/broadcast/ws/config.go b/plugins/broadcast/ws/config.go index 98592950..1d4132b4 100644 --- a/plugins/broadcast/ws/config.go +++ b/plugins/broadcast/ws/config.go @@ -1 +1,26 @@ package ws + +/* +broadcast: + ws-us-region-1: + subscriber: ws + path: "/ws" + + driver: redis + address: + - 6379 + db: 0 +*/ + +// Config represents configuration for the ws plugin +type Config struct { + // http path for the websocket + Path string `mapstructure:"Path"` +} + +// InitDefault initialize default values for the ws config +func (c *Config) InitDefault() { + if c.Path == "" { + c.Path = "/ws" + } +} diff --git a/plugins/broadcast/ws/connection.go b/plugins/broadcast/ws/connection.go new file mode 100644 index 00000000..9f7bf00e --- /dev/null +++ b/plugins/broadcast/ws/connection.go @@ -0,0 +1,69 @@ +package ws + +import ( + "sync" + + "github.com/gofiber/websocket/v2" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// Connection represents wrapped and safe to use from the different threads websocket connection +type Connection struct { + sync.RWMutex + log logger.Logger + conn *websocket.Conn +} + +func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection { + return &Connection{ + conn: wsConn, + log: log, + } +} + +func (c *Connection) Write(mt int, data []byte) error { + c.Lock() + defer c.Unlock() + + const op = errors.Op("websocket_write") + // handle a case when a goroutine tried to write into the closed connection + defer func() { + if r := recover(); r != nil { + c.log.Warn("panic handled, tried to write into the closed connection") + } + }() + + err := c.conn.WriteMessage(mt, data) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *Connection) Read() (int, []byte, error) { + c.RLock() + defer c.RUnlock() + const op = errors.Op("websocket_read") + + mt, data, err := c.conn.ReadMessage() + if err != nil { + return -1, nil, errors.E(op, err) + } + + return mt, data, nil +} + +func (c *Connection) Close() error { + c.Lock() + defer c.Unlock() + const op = errors.Op("websocket_close") + + err := c.conn.Close() + if err != nil { + return errors.E(op, err) + } + + return nil +} diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go index 98592950..c9a97606 100644 --- a/plugins/broadcast/ws/plugin.go +++ b/plugins/broadcast/ws/plugin.go @@ -1 +1,72 @@ package ws + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + // + RootPluginName = "broadcast" + // + PluginName = "websockets" +) + +type Plugin struct { + // logger + log logger.Logger + // configurer plugin + cfg config.Configurer +} + + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("ws_plugin_init") + + // check for the configuration section existence + if !cfg.Has(RootPluginName) { + return errors.E(op, errors.Disabled, errors.Str("broadcast plugin section should exists in the configuration")) + } + + p.cfg = cfg + p.log = log + + return nil +} + +func (p *Plugin) Serve() chan error { + errCh := make(chan error) + + return errCh +} + +func (p *Plugin) Stop() error { + return nil +} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Provides() []interface{} { + return []interface{}{ + p.Websocket, + } +} + +// Websocket method should provide the Subscriber implementation to the broadcast +func (p *Plugin) Websocket() (broadcast.Subscriber, error) { + const op = errors.Op("websocket_subscriber_provide") + ws, err := NewWSSubscriber() + if err != nil { + return nil, errors.E(op, err) + } + + return ws, nil +} + + + +func (p *Plugin) Available(){} diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go index 98592950..2039cf95 100644 --- a/plugins/broadcast/ws/subscriber.go +++ b/plugins/broadcast/ws/subscriber.go @@ -1 +1,35 @@ package ws + +import "github.com/spiral/roadrunner/v2/plugins/broadcast" + +type Subscriber struct { + connections map[string]*Connection + storage broadcast.Storage +} + +func NewWSSubscriber() (broadcast.Subscriber, error) { + m := make(map[string]*Connection) + return &Subscriber{ + connections: m, + }, nil +} + +func (s *Subscriber) Subscribe(upstream chan *broadcast.Message, topics ...string) error { + panic("implement me") + + + + +} + +func (s *Subscriber) SubscribePattern(upstream chan *broadcast.Message, pattern string) error { + panic("implement me") +} + +func (s *Subscriber) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error { + panic("implement me") +} + +func (s *Subscriber) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error { + panic("implement me") +} diff --git a/plugins/config/interface.go b/plugins/config/interface.go index 59ad981f..b3854e09 100644 --- a/plugins/config/interface.go +++ b/plugins/config/interface.go @@ -11,7 +11,7 @@ type Configurer interface { // } UnmarshalKey(name string, out interface{}) error - // Unmarshal unmarshals the config into a Struct. Make sure that the tags + // Unmarshal unmarshal the config into a Struct. Make sure that the tags // on the fields of the structure are properly set. Unmarshal(out interface{}) error @@ -24,6 +24,6 @@ type Configurer interface { // Has checks if config section exists. Has(name string) bool - // Returns General section. Read-only + // GetCommonConfig returns General section. Read-only GetCommonConfig() *General } diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go index fe2fa10b..319915c5 100644 --- a/plugins/kv/storage.go +++ b/plugins/kv/storage.go @@ -90,7 +90,7 @@ func (p *Plugin) Serve() chan error { return errCh } - // config key for the particular sub-driver + // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, k) // at this point we know, that driver field present in the cofiguration switch v.(map[string]interface{})[driver] { |