summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/bst/bst.go (renamed from plugins/memory/bst/bst.go)8
-rw-r--r--pkg/bst/bst_test.go (renamed from plugins/memory/bst/bst_test.go)0
-rw-r--r--pkg/bst/doc.go7
-rw-r--r--pkg/bst/interface.go (renamed from plugins/memory/bst/interface.go)0
-rw-r--r--pkg/pubsub/message.go38
-rw-r--r--plugins/http/plugin.go224
-rw-r--r--plugins/http/serve.go70
-rw-r--r--plugins/memory/config.go8
-rw-r--r--plugins/memory/driver.go28
-rw-r--r--plugins/memory/plugin.go61
-rw-r--r--plugins/redis/fanin.go6
-rw-r--r--plugins/websockets/connection/connection.go2
-rw-r--r--plugins/websockets/executor/executor.go33
-rw-r--r--plugins/websockets/plugin.go41
-rw-r--r--plugins/websockets/storage/storage.go2
-rw-r--r--tests/plugins/http/handler_test.go2
-rw-r--r--tests/plugins/http/parse_test.go2
-rw-r--r--tests/plugins/http/response_test.go2
-rw-r--r--tests/plugins/http/uploads_test.go2
19 files changed, 232 insertions, 304 deletions
diff --git a/plugins/memory/bst/bst.go b/pkg/bst/bst.go
index 3060ff11..8477ceee 100644
--- a/plugins/memory/bst/bst.go
+++ b/pkg/bst/bst.go
@@ -76,7 +76,7 @@ func (b *BST) Remove(uuid string, topic string) {
func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
curr := b
for curr != nil {
- if topic < curr.topic {
+ if topic < curr.topic { //nolint:gocritic
parent = curr
curr = curr.left
} else if topic > curr.topic {
@@ -90,11 +90,11 @@ func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:go
}
}
- if curr.left != nil && curr.right != nil {
+ if curr.left != nil && curr.right != nil { //nolint:gocritic
curr.topic, curr.uuids = curr.right.traverseForMinString()
curr.right.removeHelper(curr.topic, uuid, curr)
} else if parent == nil {
- if curr.left != nil {
+ if curr.left != nil { //nolint:gocritic
curr.topic = curr.left.topic
curr.uuids = curr.left.uuids
@@ -106,7 +106,7 @@ func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:go
curr.left = curr.right.left
curr.right = curr.right.right
- } else {
+ } else { //nolint:staticcheck
// single node tree
}
} else if parent.left == curr {
diff --git a/plugins/memory/bst/bst_test.go b/pkg/bst/bst_test.go
index e8a13760..e8a13760 100644
--- a/plugins/memory/bst/bst_test.go
+++ b/pkg/bst/bst_test.go
diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go
new file mode 100644
index 00000000..abb7e6e9
--- /dev/null
+++ b/pkg/bst/doc.go
@@ -0,0 +1,7 @@
+package bst
+
+/*
+Binary search tree for the pubsub
+
+The vertex may have one or multiply topics associated with the single websocket connection UUID
+*/
diff --git a/plugins/memory/bst/interface.go b/pkg/bst/interface.go
index ecf40414..ecf40414 100644
--- a/plugins/memory/bst/interface.go
+++ b/pkg/bst/interface.go
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
index ab74eb98..2536aece 100644
--- a/pkg/pubsub/message.go
+++ b/pkg/pubsub/message.go
@@ -6,59 +6,37 @@ import (
type Msg struct {
// Topic message been pushed into.
- T []string `json:"topic"`
+ Topics_ []string `json:"topic"`
// Command (join, leave, headers)
- C string `json:"command"`
+ Command_ string `json:"command"`
// Broker (redis, memory)
- B string `json:"broker"`
+ Broker_ string `json:"broker"`
// Payload to be broadcasted
- P []byte `json:"payload"`
+ Payload_ []byte `json:"payload"`
}
-//func (m Msg) UnmarshalBinary(data []byte) error {
-// //Use default gob decoder
-// reader := bytes.NewReader(data)
-// dec := gob.NewDecoder(reader)
-// if err := dec.Decode(&m); err != nil {
-// return err
-// }
-//
-// return nil
-//}
-
func (m *Msg) MarshalBinary() ([]byte, error) {
- //buf := new(bytes.Buffer)
- //
- //for i := 0; i < len(m.T); i++ {
- // buf.WriteString(m.T[i])
- //}
- //
- //buf.WriteString(m.C)
- //buf.WriteString(m.B)
- //buf.Write(m.P)
-
return json.Marshal(m)
-
}
// Payload in raw bytes
func (m *Msg) Payload() []byte {
- return m.P
+ return m.Payload_
}
// Command for the connection
func (m *Msg) Command() string {
- return m.C
+ return m.Command_
}
// Topics to subscribe
func (m *Msg) Topics() []string {
- return m.T
+ return m.Topics_
}
func (m *Msg) Broker() string {
- return m.B
+ return m.Broker_
}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index d5324246..8bcffb63 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -71,47 +71,47 @@ type Plugin struct {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error {
const op = errors.Op("http_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, err)
}
- err = s.cfg.InitDefaults()
+ err = p.cfg.InitDefaults()
if err != nil {
return errors.E(op, err)
}
// rr logger (via plugin)
- s.log = rrLogger
+ p.log = rrLogger
// use time and date in UTC format
- s.stdLog = log.New(logger.NewStdAdapter(s.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC)
+ p.stdLog = log.New(logger.NewStdAdapter(p.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC)
- s.mdwr = make(map[string]Middleware)
+ p.mdwr = make(map[string]Middleware)
- if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() {
+ if !p.cfg.EnableHTTP() && !p.cfg.EnableTLS() && !p.cfg.EnableFCGI() {
return errors.E(op, errors.Disabled)
}
// init if nil
- if s.cfg.Env == nil {
- s.cfg.Env = make(map[string]string)
+ if p.cfg.Env == nil {
+ p.cfg.Env = make(map[string]string)
}
- s.cfg.Env[RrMode] = "http"
- s.server = server
+ p.cfg.Env[RrMode] = "http"
+ p.server = server
return nil
}
-func (s *Plugin) logCallback(event interface{}) {
+func (p *Plugin) logCallback(event interface{}) {
if ev, ok := event.(handler.ResponseEvent); ok {
- s.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI),
+ p.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI),
"remote", ev.Request.RemoteAddr,
"elapsed", ev.Elapsed().String(),
)
@@ -119,60 +119,60 @@ func (s *Plugin) logCallback(event interface{}) {
}
// Serve serves the svc.
-func (s *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 2)
// run whole process in the goroutine
go func() {
// protect http initialization
- s.Lock()
- s.serve(errCh)
- s.Unlock()
+ p.Lock()
+ p.serve(errCh)
+ p.Unlock()
}()
return errCh
}
-func (s *Plugin) serve(errCh chan error) {
+func (p *Plugin) serve(errCh chan error) {
var err error
const op = errors.Op("http_plugin_serve")
- s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
+ p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, p.cfg.Env, p.logCallback)
if err != nil {
errCh <- errors.E(op, err)
return
}
- s.handler, err = handler.NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.Cidrs,
- s.pool,
+ p.handler, err = handler.NewHandler(
+ p.cfg.MaxRequestSize,
+ *p.cfg.Uploads,
+ p.cfg.Cidrs,
+ p.pool,
)
if err != nil {
errCh <- errors.E(op, err)
return
}
- s.handler.AddListener(s.logCallback)
+ p.handler.AddListener(p.logCallback)
- if s.cfg.EnableHTTP() {
- if s.cfg.EnableH2C() {
- s.http = &http.Server{Handler: h2c.NewHandler(s, &http2.Server{}), ErrorLog: s.stdLog}
+ if p.cfg.EnableHTTP() {
+ if p.cfg.EnableH2C() {
+ p.http = &http.Server{Handler: h2c.NewHandler(p, &http2.Server{}), ErrorLog: p.stdLog}
} else {
- s.http = &http.Server{Handler: s, ErrorLog: s.stdLog}
+ p.http = &http.Server{Handler: p, ErrorLog: p.stdLog}
}
}
- if s.cfg.EnableTLS() {
- s.https = s.initSSL()
- if s.cfg.SSLConfig.RootCA != "" {
- err = s.appendRootCa()
+ if p.cfg.EnableTLS() {
+ p.https = p.initSSL()
+ if p.cfg.SSLConfig.RootCA != "" {
+ err = p.appendRootCa()
if err != nil {
errCh <- errors.E(op, err)
return
@@ -180,102 +180,102 @@ func (s *Plugin) serve(errCh chan error) {
}
// if HTTP2Config not nil
- if s.cfg.HTTP2Config != nil {
- if err := s.initHTTP2(); err != nil {
+ if p.cfg.HTTP2Config != nil {
+ if err := p.initHTTP2(); err != nil {
errCh <- errors.E(op, err)
return
}
}
}
- if s.cfg.EnableFCGI() {
- s.fcgi = &http.Server{Handler: s, ErrorLog: s.stdLog}
+ if p.cfg.EnableFCGI() {
+ p.fcgi = &http.Server{Handler: p, ErrorLog: p.stdLog}
}
// start http, https and fcgi servers if requested in the config
go func() {
- s.serveHTTP(errCh)
+ p.serveHTTP(errCh)
}()
go func() {
- s.serveHTTPS(errCh)
+ p.serveHTTPS(errCh)
}()
go func() {
- s.serveFCGI(errCh)
+ p.serveFCGI(errCh)
}()
}
// Stop stops the http.
-func (s *Plugin) Stop() error {
- s.Lock()
- defer s.Unlock()
+func (p *Plugin) Stop() error {
+ p.Lock()
+ defer p.Unlock()
var err error
- if s.fcgi != nil {
- err = s.fcgi.Shutdown(context.Background())
+ if p.fcgi != nil {
+ err = p.fcgi.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the fcgi server", "error", err)
+ p.log.Error("error shutting down the fcgi server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
- if s.https != nil {
- err = s.https.Shutdown(context.Background())
+ if p.https != nil {
+ err = p.https.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the https server", "error", err)
+ p.log.Error("error shutting down the https server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
- if s.http != nil {
- err = s.http.Shutdown(context.Background())
+ if p.http != nil {
+ err = p.http.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the http server", "error", err)
+ p.log.Error("error shutting down the http server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
// check for safety
- if s.pool != nil {
- s.pool.Destroy(context.Background())
+ if p.pool != nil {
+ p.pool.Destroy(context.Background())
}
return err
}
// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
-func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if headerContainsUpgrade(r) {
http.Error(w, "server does not support upgrade header", http.StatusInternalServerError)
return
}
- if s.https != nil && r.TLS == nil && s.cfg.SSLConfig.Redirect {
- s.redirect(w, r)
+ if p.https != nil && r.TLS == nil && p.cfg.SSLConfig.Redirect {
+ p.redirect(w, r)
return
}
- if s.https != nil && r.TLS != nil {
+ if p.https != nil && r.TLS != nil {
w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
}
r = attributes.Init(r)
// protect the case, when user sendEvent Reset and we are replacing handler with pool
- s.RLock()
- s.handler.ServeHTTP(w, r)
- s.RUnlock()
+ p.RLock()
+ p.handler.ServeHTTP(w, r)
+ p.RUnlock()
}
// Workers returns slice with the process states for the workers
-func (s *Plugin) Workers() []process.State {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Workers() []process.State {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
ps := make([]process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
@@ -290,74 +290,74 @@ func (s *Plugin) Workers() []process.State {
}
// internal
-func (s *Plugin) workers() []worker.BaseProcess {
- return s.pool.Workers()
+func (p *Plugin) workers() []worker.BaseProcess {
+ return p.pool.Workers()
}
// Name returns endure.Named interface implementation
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Reset destroys the old pool and replaces it with new one, waiting for old pool to die
-func (s *Plugin) Reset() error {
- s.Lock()
- defer s.Unlock()
+func (p *Plugin) Reset() error {
+ p.Lock()
+ defer p.Unlock()
const op = errors.Op("http_plugin_reset")
- s.log.Info("HTTP plugin got restart request. Restarting...")
- s.pool.Destroy(context.Background())
- s.pool = nil
+ p.log.Info("HTTP plugin got restart request. Restarting...")
+ p.pool.Destroy(context.Background())
+ p.pool = nil
var err error
- s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
+ p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, p.cfg.Env, p.logCallback)
if err != nil {
return errors.E(op, err)
}
- s.log.Info("HTTP workers Pool successfully restarted")
+ p.log.Info("HTTP workers Pool successfully restarted")
- s.handler, err = handler.NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.Cidrs,
- s.pool,
+ p.handler, err = handler.NewHandler(
+ p.cfg.MaxRequestSize,
+ *p.cfg.Uploads,
+ p.cfg.Cidrs,
+ p.pool,
)
if err != nil {
return errors.E(op, err)
}
- s.log.Info("HTTP handler listeners successfully re-added")
- s.handler.AddListener(s.logCallback)
+ p.log.Info("HTTP handler listeners successfully re-added")
+ p.handler.AddListener(p.logCallback)
- s.log.Info("HTTP plugin successfully restarted")
+ p.log.Info("HTTP plugin successfully restarted")
return nil
}
// Collects collecting http middlewares
-func (s *Plugin) Collects() []interface{} {
+func (p *Plugin) Collects() []interface{} {
return []interface{}{
- s.AddMiddleware,
+ p.AddMiddleware,
}
}
// AddMiddleware is base requirement for the middleware (name and Middleware)
-func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
- s.mdwr[name.Name()] = m
+func (p *Plugin) AddMiddleware(name endure.Named, m Middleware) {
+ p.mdwr[name.Name()] = m
}
// Status return status of the particular plugin
-func (s *Plugin) Status() status.Status {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Status() status.Status {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
for i := 0; i < len(workers); i++ {
if workers[i].State().IsActive() {
return status.Status{
@@ -372,14 +372,14 @@ func (s *Plugin) Status() status.Status {
}
// Ready return readiness status of the particular plugin
-func (s *Plugin) Ready() status.Status {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Ready() status.Status {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
- // we assume, that plugin's worker pool is ready
+ // we assume, that plugin'p worker pool is ready
if workers[i].State().Value() == worker.StateReady {
return status.Status{
Code: http.StatusOK,
@@ -393,4 +393,4 @@ func (s *Plugin) Ready() status.Status {
}
// Available interface implementation
-func (s *Plugin) Available() {}
+func (p *Plugin) Available() {}
diff --git a/plugins/http/serve.go b/plugins/http/serve.go
index 734860f5..bf1ccafe 100644
--- a/plugins/http/serve.go
+++ b/plugins/http/serve.go
@@ -17,46 +17,46 @@ import (
"golang.org/x/sys/cpu"
)
-func (s *Plugin) serveHTTP(errCh chan error) {
- if s.http == nil {
+func (p *Plugin) serveHTTP(errCh chan error) {
+ if p.http == nil {
return
}
const op = errors.Op("serveHTTP")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log)
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.http, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.Address)
+ l, err := utils.CreateListener(p.cfg.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = s.http.Serve(l)
+ err = p.http.Serve(l)
if err != nil && err != http.ErrServerClosed {
errCh <- errors.E(op, err)
return
}
}
-func (s *Plugin) serveHTTPS(errCh chan error) {
- if s.https == nil {
+func (p *Plugin) serveHTTPS(errCh chan error) {
+ if p.https == nil {
return
}
const op = errors.Op("serveHTTPS")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.SSLConfig.Address)
+ l, err := utils.CreateListener(p.cfg.SSLConfig.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = s.https.ServeTLS(
+ err = p.https.ServeTLS(
l,
- s.cfg.SSLConfig.Cert,
- s.cfg.SSLConfig.Key,
+ p.cfg.SSLConfig.Cert,
+ p.cfg.SSLConfig.Key,
)
if err != nil && err != http.ErrServerClosed {
@@ -66,34 +66,34 @@ func (s *Plugin) serveHTTPS(errCh chan error) {
}
// serveFCGI starts FastCGI server.
-func (s *Plugin) serveFCGI(errCh chan error) {
- if s.fcgi == nil {
+func (p *Plugin) serveFCGI(errCh chan error) {
+ if p.fcgi == nil {
return
}
const op = errors.Op("serveFCGI")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.FCGIConfig.Address)
+ l, err := utils.CreateListener(p.cfg.FCGIConfig.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = fcgi.Serve(l, s.fcgi.Handler)
+ err = fcgi.Serve(l, p.fcgi.Handler)
if err != nil && err != http.ErrServerClosed {
errCh <- errors.E(op, err)
return
}
}
-func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) {
+func (p *Plugin) redirect(w http.ResponseWriter, r *http.Request) {
target := &url.URL{
Scheme: HTTPSScheme,
// host or host:port
- Host: s.tlsAddr(r.Host, false),
+ Host: p.tlsAddr(r.Host, false),
Path: r.URL.Path,
RawQuery: r.URL.RawQuery,
}
@@ -111,7 +111,7 @@ func headerContainsUpgrade(r *http.Request) bool {
}
// append RootCA to the https server TLS config
-func (s *Plugin) appendRootCa() error {
+func (p *Plugin) appendRootCa() error {
const op = errors.Op("http_plugin_append_root_ca")
rootCAs, err := x509.SystemCertPool()
if err != nil {
@@ -121,7 +121,7 @@ func (s *Plugin) appendRootCa() error {
rootCAs = x509.NewCertPool()
}
- CA, err := os.ReadFile(s.cfg.SSLConfig.RootCA)
+ CA, err := os.ReadFile(p.cfg.SSLConfig.RootCA)
if err != nil {
return err
}
@@ -137,13 +137,13 @@ func (s *Plugin) appendRootCa() error {
InsecureSkipVerify: false,
RootCAs: rootCAs,
}
- s.http.TLSConfig = cfg
+ p.http.TLSConfig = cfg
return nil
}
// Init https server
-func (s *Plugin) initSSL() *http.Server {
+func (p *Plugin) initSSL() *http.Server {
var topCipherSuites []uint16
var defaultCipherSuitesTLS13 []uint16
@@ -193,9 +193,9 @@ func (s *Plugin) initSSL() *http.Server {
DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
sslServer := &http.Server{
- Addr: s.tlsAddr(s.cfg.Address, true),
- Handler: s,
- ErrorLog: s.stdLog,
+ Addr: p.tlsAddr(p.cfg.Address, true),
+ Handler: p,
+ ErrorLog: p.stdLog,
TLSConfig: &tls.Config{
CurvePreferences: []tls.CurveID{
tls.CurveP256,
@@ -213,19 +213,19 @@ func (s *Plugin) initSSL() *http.Server {
}
// init http/2 server
-func (s *Plugin) initHTTP2() error {
- return http2.ConfigureServer(s.https, &http2.Server{
- MaxConcurrentStreams: s.cfg.HTTP2Config.MaxConcurrentStreams,
+func (p *Plugin) initHTTP2() error {
+ return http2.ConfigureServer(p.https, &http2.Server{
+ MaxConcurrentStreams: p.cfg.HTTP2Config.MaxConcurrentStreams,
})
}
// tlsAddr replaces listen or host port with port configured by SSLConfig config.
-func (s *Plugin) tlsAddr(host string, forcePort bool) string {
+func (p *Plugin) tlsAddr(host string, forcePort bool) string {
// remove current forcePort first
host = strings.Split(host, ":")[0]
- if forcePort || s.cfg.SSLConfig.Port != 443 {
- host = fmt.Sprintf("%s:%v", host, s.cfg.SSLConfig.Port)
+ if forcePort || p.cfg.SSLConfig.Port != 443 {
+ host = fmt.Sprintf("%s:%v", host, p.cfg.SSLConfig.Port)
}
return host
diff --git a/plugins/memory/config.go b/plugins/memory/config.go
deleted file mode 100644
index 08dd9fc3..00000000
--- a/plugins/memory/config.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package memory
-
-// Config for the memory driver is empty, it's just a placeholder
-type Config struct {
- Path string `mapstructure:"path"`
-}
-
-func (c *Config) InitDefaults() {}
diff --git a/plugins/memory/driver.go b/plugins/memory/driver.go
deleted file mode 100644
index 5a96e773..00000000
--- a/plugins/memory/driver.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package memory
-
-import (
- "github.com/spiral/roadrunner/v2/plugins/memory/bst"
-)
-
-type Driver struct {
- tree bst.Storage
-}
-
-func NewInMemoryDriver() bst.Storage {
- b := &Driver{
- tree: bst.NewBST(),
- }
- return b
-}
-
-func (d *Driver) Insert(uuid string, topic string) {
- d.tree.Insert(uuid, topic)
-}
-
-func (d *Driver) Remove(uuid, topic string) {
- d.tree.Remove(uuid, topic)
-}
-
-func (d *Driver) Get(topic string) map[string]struct{} {
- return d.tree.Get(topic)
-}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 5efd5522..2ad041aa 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -1,9 +1,9 @@
package memory
import (
- "github.com/spiral/errors"
+ "sync"
+
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -13,28 +13,16 @@ const (
type Plugin struct {
log logger.Logger
- cfg *Config
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("memory_plugin_init")
-
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- p.log = log
- return nil
-}
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("memory_plugin_serve")
- errCh := make(chan error)
-
- return errCh
+ // channel with the messages from the RPC
+ pushCh chan pubsub.Message
+ // user-subscribed topics
+ topics sync.Map
}
-func (p *Plugin) Stop() error {
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ p.pushCh = make(chan pubsub.Message, 100)
return nil
}
@@ -47,21 +35,42 @@ func (p *Plugin) Name() string {
}
func (p *Plugin) Publish(messages []pubsub.Message) error {
- panic("implement me")
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ return nil
}
func (p *Plugin) PublishAsync(messages []pubsub.Message) {
- panic("implement me")
+ go func() {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ }()
}
func (p *Plugin) Subscribe(topics ...string) error {
- panic("implement me")
+ for i := 0; i < len(topics); i++ {
+ p.topics.Store(topics[i], struct{}{})
+ }
+ return nil
}
func (p *Plugin) Unsubscribe(topics ...string) error {
- panic("implement me")
+ for i := 0; i < len(topics); i++ {
+ p.topics.Delete(topics[i])
+ }
+ return nil
}
func (p *Plugin) Next() (pubsub.Message, error) {
- panic("implement me")
+ msg := <-p.pushCh
+ // push only messages, which are subscribed
+ // TODO better???
+ for i := 0; i < len(msg.Topics()); i++ {
+ if _, ok := p.topics.Load(msg.Topics()[i]); ok {
+ return msg, nil
+ }
+ }
+ return nil, nil
}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 29016720..8e924b2d 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -56,9 +56,9 @@ func (fi *FanIn) AddChannel(topics ...string) error {
func (fi *FanIn) read() {
for {
select {
- //here we receive message from us (which we sent before in Publish)
- //it should be compatible with the websockets.Msg interface
- //payload should be in the redis.message.payload field
+ // here we receive message from us (which we sent before in Publish)
+ // it should be compatible with the websockets.Msg interface
+ // payload should be in the redis.message.payload field
case msg, ok := <-fi.pubsub.Channel():
// channel closed
diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go
index 5eb30c61..2b847173 100644
--- a/plugins/websockets/connection/connection.go
+++ b/plugins/websockets/connection/connection.go
@@ -43,8 +43,6 @@ func (c *Connection) Write(mt int, data []byte) error {
}
func (c *Connection) Read() (int, []byte, error) {
- //c.RLock()
- //defer c.RUnlock()
const op = errors.Op("websocket_read")
mt, data, err := c.conn.ReadMessage()
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 391c9a8c..9ef5e40a 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -22,11 +22,11 @@ type Executor struct {
// associated connection ID
connID string
- pubsub pubsub.PubSub
+ pubsub map[string]pubsub.PubSub
}
// NewExecutor creates protected connection and starts command loop
-func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor {
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs map[string]pubsub.PubSub) *Executor {
return &Executor{
conn: conn,
connID: connID,
@@ -58,31 +58,6 @@ func (e *Executor) StartCommandLoop() error {
switch msg.Command() {
// handle leave
case commands.Join:
- // TODO access validators model update
- //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...)
- //// validation error
- //if err != nil {
- // e.log.Error("validation error", "error", err)
- //
- // resp := &Response{
- // Topic: "#join",
- // Payload: msg.Topics(),
- // }
- //
- // packet, err := json.Marshal(resp)
- // if err != nil {
- // e.log.Error("error marshal the body", "error", err)
- // return err
- // }
- //
- // err = e.conn.Write(websocket.BinaryMessage, packet)
- // if err != nil {
- // e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
- // continue
- // }
- //
- // continue
- //}
// associate connection with topics
e.storage.Store(e.connID, msg.Topics())
@@ -103,7 +78,7 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub.Subscribe(msg.Topics()...)
+ err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
continue
@@ -125,7 +100,7 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub.Unsubscribe(msg.Topics()...)
+ err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
continue
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index a247da69..bc5028e6 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -58,28 +58,25 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
func (p *Plugin) Serve() chan error {
errCh := make(chan error)
- go func() {
- ps := p.pubsubs["redis"]
-
- for {
- // get message
- // get topic
- // get connection uuid from the storage by the topic
- // write payload into the connection
- // do that in the workers pool
- data, err := ps.Next()
- if err != nil {
- errCh <- err
- return
- }
- if data == nil {
- continue
- }
+ // run all pubsubs drivers
+ for _, v := range p.pubsubs {
+ go func(ps pubsub.PubSub) {
+ for {
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- p.workersPool.Queue(data)
- }
- }()
+ if data == nil {
+ continue
+ }
+
+ p.workersPool.Queue(data)
+ }
+ }(v)
+ }
return errCh
}
@@ -156,7 +153,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
// Executor wraps a connection to have a safe abstraction
p.Lock()
- e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"])
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs)
p.Unlock()
p.log.Info("websocket client connected", "uuid", connectionID)
@@ -169,6 +166,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
})
}
+// Publish is an entry point to the websocket PUBSUB
func (p *Plugin) Publish(msg []pubsub.Message) error {
p.Lock()
defer p.Unlock()
@@ -199,7 +197,6 @@ func (p *Plugin) PublishAsync(msg []pubsub.Message) {
p.log.Error("publish async error", "error", err)
return
}
-
}
}
}()
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
index a7e49207..34f53cfd 100644
--- a/plugins/websockets/storage/storage.go
+++ b/plugins/websockets/storage/storage.go
@@ -3,7 +3,7 @@ package storage
import (
"sync"
- "github.com/spiral/roadrunner/v2/plugins/memory/bst"
+ "github.com/spiral/roadrunner/v2/pkg/bst"
)
type Storage struct {
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 1fa29783..f6533dc4 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -12,7 +12,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
- "github.com/spiral/roadrunner/v2/pkg/worker_handler"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
"github.com/stretchr/testify/assert"
diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go
index 32738ae0..d75620f3 100644
--- a/tests/plugins/http/parse_test.go
+++ b/tests/plugins/http/parse_test.go
@@ -3,7 +3,7 @@ package http
import (
"testing"
- "github.com/spiral/roadrunner/v2/pkg/worker_handler"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
)
var samples = []struct {
diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go
index 5b72df40..276c22ef 100644
--- a/tests/plugins/http/response_test.go
+++ b/tests/plugins/http/response_test.go
@@ -7,7 +7,7 @@ import (
"testing"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/worker_handler"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/stretchr/testify/assert"
)
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index 82843d4e..903a930a 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -18,7 +18,7 @@ import (
j "github.com/json-iterator/go"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
- "github.com/spiral/roadrunner/v2/pkg/worker_handler"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
"github.com/stretchr/testify/assert"
)