diff options
-rw-r--r-- | pkg/bst/bst.go (renamed from plugins/memory/bst/bst.go) | 8 | ||||
-rw-r--r-- | pkg/bst/bst_test.go (renamed from plugins/memory/bst/bst_test.go) | 0 | ||||
-rw-r--r-- | pkg/bst/doc.go | 7 | ||||
-rw-r--r-- | pkg/bst/interface.go (renamed from plugins/memory/bst/interface.go) | 0 | ||||
-rw-r--r-- | pkg/pubsub/message.go | 38 | ||||
-rw-r--r-- | plugins/http/plugin.go | 224 | ||||
-rw-r--r-- | plugins/http/serve.go | 70 | ||||
-rw-r--r-- | plugins/memory/config.go | 8 | ||||
-rw-r--r-- | plugins/memory/driver.go | 28 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 61 | ||||
-rw-r--r-- | plugins/redis/fanin.go | 6 | ||||
-rw-r--r-- | plugins/websockets/connection/connection.go | 2 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 33 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 41 | ||||
-rw-r--r-- | plugins/websockets/storage/storage.go | 2 | ||||
-rw-r--r-- | tests/plugins/http/handler_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/http/parse_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/http/response_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/http/uploads_test.go | 2 |
19 files changed, 232 insertions, 304 deletions
diff --git a/plugins/memory/bst/bst.go b/pkg/bst/bst.go index 3060ff11..8477ceee 100644 --- a/plugins/memory/bst/bst.go +++ b/pkg/bst/bst.go @@ -76,7 +76,7 @@ func (b *BST) Remove(uuid string, topic string) { func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit curr := b for curr != nil { - if topic < curr.topic { + if topic < curr.topic { //nolint:gocritic parent = curr curr = curr.left } else if topic > curr.topic { @@ -90,11 +90,11 @@ func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:go } } - if curr.left != nil && curr.right != nil { + if curr.left != nil && curr.right != nil { //nolint:gocritic curr.topic, curr.uuids = curr.right.traverseForMinString() curr.right.removeHelper(curr.topic, uuid, curr) } else if parent == nil { - if curr.left != nil { + if curr.left != nil { //nolint:gocritic curr.topic = curr.left.topic curr.uuids = curr.left.uuids @@ -106,7 +106,7 @@ func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:go curr.left = curr.right.left curr.right = curr.right.right - } else { + } else { //nolint:staticcheck // single node tree } } else if parent.left == curr { diff --git a/plugins/memory/bst/bst_test.go b/pkg/bst/bst_test.go index e8a13760..e8a13760 100644 --- a/plugins/memory/bst/bst_test.go +++ b/pkg/bst/bst_test.go diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go new file mode 100644 index 00000000..abb7e6e9 --- /dev/null +++ b/pkg/bst/doc.go @@ -0,0 +1,7 @@ +package bst + +/* +Binary search tree for the pubsub + +The vertex may have one or multiply topics associated with the single websocket connection UUID +*/ diff --git a/plugins/memory/bst/interface.go b/pkg/bst/interface.go index ecf40414..ecf40414 100644 --- a/plugins/memory/bst/interface.go +++ b/pkg/bst/interface.go diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go index ab74eb98..2536aece 100644 --- a/pkg/pubsub/message.go +++ b/pkg/pubsub/message.go @@ -6,59 +6,37 @@ import ( type Msg struct { // Topic message been pushed into. - T []string `json:"topic"` + Topics_ []string `json:"topic"` // Command (join, leave, headers) - C string `json:"command"` + Command_ string `json:"command"` // Broker (redis, memory) - B string `json:"broker"` + Broker_ string `json:"broker"` // Payload to be broadcasted - P []byte `json:"payload"` + Payload_ []byte `json:"payload"` } -//func (m Msg) UnmarshalBinary(data []byte) error { -// //Use default gob decoder -// reader := bytes.NewReader(data) -// dec := gob.NewDecoder(reader) -// if err := dec.Decode(&m); err != nil { -// return err -// } -// -// return nil -//} - func (m *Msg) MarshalBinary() ([]byte, error) { - //buf := new(bytes.Buffer) - // - //for i := 0; i < len(m.T); i++ { - // buf.WriteString(m.T[i]) - //} - // - //buf.WriteString(m.C) - //buf.WriteString(m.B) - //buf.Write(m.P) - return json.Marshal(m) - } // Payload in raw bytes func (m *Msg) Payload() []byte { - return m.P + return m.Payload_ } // Command for the connection func (m *Msg) Command() string { - return m.C + return m.Command_ } // Topics to subscribe func (m *Msg) Topics() []string { - return m.T + return m.Topics_ } func (m *Msg) Broker() string { - return m.B + return m.Broker_ } diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index d5324246..8bcffb63 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -71,47 +71,47 @@ type Plugin struct { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error { +func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error { const op = errors.Op("http_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, err) } - err = s.cfg.InitDefaults() + err = p.cfg.InitDefaults() if err != nil { return errors.E(op, err) } // rr logger (via plugin) - s.log = rrLogger + p.log = rrLogger // use time and date in UTC format - s.stdLog = log.New(logger.NewStdAdapter(s.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC) + p.stdLog = log.New(logger.NewStdAdapter(p.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC) - s.mdwr = make(map[string]Middleware) + p.mdwr = make(map[string]Middleware) - if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() { + if !p.cfg.EnableHTTP() && !p.cfg.EnableTLS() && !p.cfg.EnableFCGI() { return errors.E(op, errors.Disabled) } // init if nil - if s.cfg.Env == nil { - s.cfg.Env = make(map[string]string) + if p.cfg.Env == nil { + p.cfg.Env = make(map[string]string) } - s.cfg.Env[RrMode] = "http" - s.server = server + p.cfg.Env[RrMode] = "http" + p.server = server return nil } -func (s *Plugin) logCallback(event interface{}) { +func (p *Plugin) logCallback(event interface{}) { if ev, ok := event.(handler.ResponseEvent); ok { - s.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI), + p.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI), "remote", ev.Request.RemoteAddr, "elapsed", ev.Elapsed().String(), ) @@ -119,60 +119,60 @@ func (s *Plugin) logCallback(event interface{}) { } // Serve serves the svc. -func (s *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { errCh := make(chan error, 2) // run whole process in the goroutine go func() { // protect http initialization - s.Lock() - s.serve(errCh) - s.Unlock() + p.Lock() + p.serve(errCh) + p.Unlock() }() return errCh } -func (s *Plugin) serve(errCh chan error) { +func (p *Plugin) serve(errCh chan error) { var err error const op = errors.Op("http_plugin_serve") - s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) + p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, p.cfg.Env, p.logCallback) if err != nil { errCh <- errors.E(op, err) return } - s.handler, err = handler.NewHandler( - s.cfg.MaxRequestSize, - *s.cfg.Uploads, - s.cfg.Cidrs, - s.pool, + p.handler, err = handler.NewHandler( + p.cfg.MaxRequestSize, + *p.cfg.Uploads, + p.cfg.Cidrs, + p.pool, ) if err != nil { errCh <- errors.E(op, err) return } - s.handler.AddListener(s.logCallback) + p.handler.AddListener(p.logCallback) - if s.cfg.EnableHTTP() { - if s.cfg.EnableH2C() { - s.http = &http.Server{Handler: h2c.NewHandler(s, &http2.Server{}), ErrorLog: s.stdLog} + if p.cfg.EnableHTTP() { + if p.cfg.EnableH2C() { + p.http = &http.Server{Handler: h2c.NewHandler(p, &http2.Server{}), ErrorLog: p.stdLog} } else { - s.http = &http.Server{Handler: s, ErrorLog: s.stdLog} + p.http = &http.Server{Handler: p, ErrorLog: p.stdLog} } } - if s.cfg.EnableTLS() { - s.https = s.initSSL() - if s.cfg.SSLConfig.RootCA != "" { - err = s.appendRootCa() + if p.cfg.EnableTLS() { + p.https = p.initSSL() + if p.cfg.SSLConfig.RootCA != "" { + err = p.appendRootCa() if err != nil { errCh <- errors.E(op, err) return @@ -180,102 +180,102 @@ func (s *Plugin) serve(errCh chan error) { } // if HTTP2Config not nil - if s.cfg.HTTP2Config != nil { - if err := s.initHTTP2(); err != nil { + if p.cfg.HTTP2Config != nil { + if err := p.initHTTP2(); err != nil { errCh <- errors.E(op, err) return } } } - if s.cfg.EnableFCGI() { - s.fcgi = &http.Server{Handler: s, ErrorLog: s.stdLog} + if p.cfg.EnableFCGI() { + p.fcgi = &http.Server{Handler: p, ErrorLog: p.stdLog} } // start http, https and fcgi servers if requested in the config go func() { - s.serveHTTP(errCh) + p.serveHTTP(errCh) }() go func() { - s.serveHTTPS(errCh) + p.serveHTTPS(errCh) }() go func() { - s.serveFCGI(errCh) + p.serveFCGI(errCh) }() } // Stop stops the http. -func (s *Plugin) Stop() error { - s.Lock() - defer s.Unlock() +func (p *Plugin) Stop() error { + p.Lock() + defer p.Unlock() var err error - if s.fcgi != nil { - err = s.fcgi.Shutdown(context.Background()) + if p.fcgi != nil { + err = p.fcgi.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the fcgi server", "error", err) + p.log.Error("error shutting down the fcgi server", "error", err) // write error and try to stop other transport err = multierror.Append(err) } } - if s.https != nil { - err = s.https.Shutdown(context.Background()) + if p.https != nil { + err = p.https.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the https server", "error", err) + p.log.Error("error shutting down the https server", "error", err) // write error and try to stop other transport err = multierror.Append(err) } } - if s.http != nil { - err = s.http.Shutdown(context.Background()) + if p.http != nil { + err = p.http.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the http server", "error", err) + p.log.Error("error shutting down the http server", "error", err) // write error and try to stop other transport err = multierror.Append(err) } } // check for safety - if s.pool != nil { - s.pool.Destroy(context.Background()) + if p.pool != nil { + p.pool.Destroy(context.Background()) } return err } // ServeHTTP handles connection using set of middleware and pool PSR-7 server. -func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if headerContainsUpgrade(r) { http.Error(w, "server does not support upgrade header", http.StatusInternalServerError) return } - if s.https != nil && r.TLS == nil && s.cfg.SSLConfig.Redirect { - s.redirect(w, r) + if p.https != nil && r.TLS == nil && p.cfg.SSLConfig.Redirect { + p.redirect(w, r) return } - if s.https != nil && r.TLS != nil { + if p.https != nil && r.TLS != nil { w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") } r = attributes.Init(r) // protect the case, when user sendEvent Reset and we are replacing handler with pool - s.RLock() - s.handler.ServeHTTP(w, r) - s.RUnlock() + p.RLock() + p.handler.ServeHTTP(w, r) + p.RUnlock() } // Workers returns slice with the process states for the workers -func (s *Plugin) Workers() []process.State { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Workers() []process.State { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() ps := make([]process.State, 0, len(workers)) for i := 0; i < len(workers); i++ { @@ -290,74 +290,74 @@ func (s *Plugin) Workers() []process.State { } // internal -func (s *Plugin) workers() []worker.BaseProcess { - return s.pool.Workers() +func (p *Plugin) workers() []worker.BaseProcess { + return p.pool.Workers() } // Name returns endure.Named interface implementation -func (s *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // Reset destroys the old pool and replaces it with new one, waiting for old pool to die -func (s *Plugin) Reset() error { - s.Lock() - defer s.Unlock() +func (p *Plugin) Reset() error { + p.Lock() + defer p.Unlock() const op = errors.Op("http_plugin_reset") - s.log.Info("HTTP plugin got restart request. Restarting...") - s.pool.Destroy(context.Background()) - s.pool = nil + p.log.Info("HTTP plugin got restart request. Restarting...") + p.pool.Destroy(context.Background()) + p.pool = nil var err error - s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) + p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, p.cfg.Env, p.logCallback) if err != nil { return errors.E(op, err) } - s.log.Info("HTTP workers Pool successfully restarted") + p.log.Info("HTTP workers Pool successfully restarted") - s.handler, err = handler.NewHandler( - s.cfg.MaxRequestSize, - *s.cfg.Uploads, - s.cfg.Cidrs, - s.pool, + p.handler, err = handler.NewHandler( + p.cfg.MaxRequestSize, + *p.cfg.Uploads, + p.cfg.Cidrs, + p.pool, ) if err != nil { return errors.E(op, err) } - s.log.Info("HTTP handler listeners successfully re-added") - s.handler.AddListener(s.logCallback) + p.log.Info("HTTP handler listeners successfully re-added") + p.handler.AddListener(p.logCallback) - s.log.Info("HTTP plugin successfully restarted") + p.log.Info("HTTP plugin successfully restarted") return nil } // Collects collecting http middlewares -func (s *Plugin) Collects() []interface{} { +func (p *Plugin) Collects() []interface{} { return []interface{}{ - s.AddMiddleware, + p.AddMiddleware, } } // AddMiddleware is base requirement for the middleware (name and Middleware) -func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) { - s.mdwr[name.Name()] = m +func (p *Plugin) AddMiddleware(name endure.Named, m Middleware) { + p.mdwr[name.Name()] = m } // Status return status of the particular plugin -func (s *Plugin) Status() status.Status { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Status() status.Status { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() for i := 0; i < len(workers); i++ { if workers[i].State().IsActive() { return status.Status{ @@ -372,14 +372,14 @@ func (s *Plugin) Status() status.Status { } // Ready return readiness status of the particular plugin -func (s *Plugin) Ready() status.Status { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Ready() status.Status { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() for i := 0; i < len(workers); i++ { // If state of the worker is ready (at least 1) - // we assume, that plugin's worker pool is ready + // we assume, that plugin'p worker pool is ready if workers[i].State().Value() == worker.StateReady { return status.Status{ Code: http.StatusOK, @@ -393,4 +393,4 @@ func (s *Plugin) Ready() status.Status { } // Available interface implementation -func (s *Plugin) Available() {} +func (p *Plugin) Available() {} diff --git a/plugins/http/serve.go b/plugins/http/serve.go index 734860f5..bf1ccafe 100644 --- a/plugins/http/serve.go +++ b/plugins/http/serve.go @@ -17,46 +17,46 @@ import ( "golang.org/x/sys/cpu" ) -func (s *Plugin) serveHTTP(errCh chan error) { - if s.http == nil { +func (p *Plugin) serveHTTP(errCh chan error) { + if p.http == nil { return } const op = errors.Op("serveHTTP") - if len(s.mdwr) > 0 { - applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log) + if len(p.mdwr) > 0 { + applyMiddlewares(p.http, p.mdwr, p.cfg.Middleware, p.log) } - l, err := utils.CreateListener(s.cfg.Address) + l, err := utils.CreateListener(p.cfg.Address) if err != nil { errCh <- errors.E(op, err) return } - err = s.http.Serve(l) + err = p.http.Serve(l) if err != nil && err != http.ErrServerClosed { errCh <- errors.E(op, err) return } } -func (s *Plugin) serveHTTPS(errCh chan error) { - if s.https == nil { +func (p *Plugin) serveHTTPS(errCh chan error) { + if p.https == nil { return } const op = errors.Op("serveHTTPS") - if len(s.mdwr) > 0 { - applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log) + if len(p.mdwr) > 0 { + applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log) } - l, err := utils.CreateListener(s.cfg.SSLConfig.Address) + l, err := utils.CreateListener(p.cfg.SSLConfig.Address) if err != nil { errCh <- errors.E(op, err) return } - err = s.https.ServeTLS( + err = p.https.ServeTLS( l, - s.cfg.SSLConfig.Cert, - s.cfg.SSLConfig.Key, + p.cfg.SSLConfig.Cert, + p.cfg.SSLConfig.Key, ) if err != nil && err != http.ErrServerClosed { @@ -66,34 +66,34 @@ func (s *Plugin) serveHTTPS(errCh chan error) { } // serveFCGI starts FastCGI server. -func (s *Plugin) serveFCGI(errCh chan error) { - if s.fcgi == nil { +func (p *Plugin) serveFCGI(errCh chan error) { + if p.fcgi == nil { return } const op = errors.Op("serveFCGI") - if len(s.mdwr) > 0 { - applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log) + if len(p.mdwr) > 0 { + applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log) } - l, err := utils.CreateListener(s.cfg.FCGIConfig.Address) + l, err := utils.CreateListener(p.cfg.FCGIConfig.Address) if err != nil { errCh <- errors.E(op, err) return } - err = fcgi.Serve(l, s.fcgi.Handler) + err = fcgi.Serve(l, p.fcgi.Handler) if err != nil && err != http.ErrServerClosed { errCh <- errors.E(op, err) return } } -func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) { +func (p *Plugin) redirect(w http.ResponseWriter, r *http.Request) { target := &url.URL{ Scheme: HTTPSScheme, // host or host:port - Host: s.tlsAddr(r.Host, false), + Host: p.tlsAddr(r.Host, false), Path: r.URL.Path, RawQuery: r.URL.RawQuery, } @@ -111,7 +111,7 @@ func headerContainsUpgrade(r *http.Request) bool { } // append RootCA to the https server TLS config -func (s *Plugin) appendRootCa() error { +func (p *Plugin) appendRootCa() error { const op = errors.Op("http_plugin_append_root_ca") rootCAs, err := x509.SystemCertPool() if err != nil { @@ -121,7 +121,7 @@ func (s *Plugin) appendRootCa() error { rootCAs = x509.NewCertPool() } - CA, err := os.ReadFile(s.cfg.SSLConfig.RootCA) + CA, err := os.ReadFile(p.cfg.SSLConfig.RootCA) if err != nil { return err } @@ -137,13 +137,13 @@ func (s *Plugin) appendRootCa() error { InsecureSkipVerify: false, RootCAs: rootCAs, } - s.http.TLSConfig = cfg + p.http.TLSConfig = cfg return nil } // Init https server -func (s *Plugin) initSSL() *http.Server { +func (p *Plugin) initSSL() *http.Server { var topCipherSuites []uint16 var defaultCipherSuitesTLS13 []uint16 @@ -193,9 +193,9 @@ func (s *Plugin) initSSL() *http.Server { DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) sslServer := &http.Server{ - Addr: s.tlsAddr(s.cfg.Address, true), - Handler: s, - ErrorLog: s.stdLog, + Addr: p.tlsAddr(p.cfg.Address, true), + Handler: p, + ErrorLog: p.stdLog, TLSConfig: &tls.Config{ CurvePreferences: []tls.CurveID{ tls.CurveP256, @@ -213,19 +213,19 @@ func (s *Plugin) initSSL() *http.Server { } // init http/2 server -func (s *Plugin) initHTTP2() error { - return http2.ConfigureServer(s.https, &http2.Server{ - MaxConcurrentStreams: s.cfg.HTTP2Config.MaxConcurrentStreams, +func (p *Plugin) initHTTP2() error { + return http2.ConfigureServer(p.https, &http2.Server{ + MaxConcurrentStreams: p.cfg.HTTP2Config.MaxConcurrentStreams, }) } // tlsAddr replaces listen or host port with port configured by SSLConfig config. -func (s *Plugin) tlsAddr(host string, forcePort bool) string { +func (p *Plugin) tlsAddr(host string, forcePort bool) string { // remove current forcePort first host = strings.Split(host, ":")[0] - if forcePort || s.cfg.SSLConfig.Port != 443 { - host = fmt.Sprintf("%s:%v", host, s.cfg.SSLConfig.Port) + if forcePort || p.cfg.SSLConfig.Port != 443 { + host = fmt.Sprintf("%s:%v", host, p.cfg.SSLConfig.Port) } return host diff --git a/plugins/memory/config.go b/plugins/memory/config.go deleted file mode 100644 index 08dd9fc3..00000000 --- a/plugins/memory/config.go +++ /dev/null @@ -1,8 +0,0 @@ -package memory - -// Config for the memory driver is empty, it's just a placeholder -type Config struct { - Path string `mapstructure:"path"` -} - -func (c *Config) InitDefaults() {} diff --git a/plugins/memory/driver.go b/plugins/memory/driver.go deleted file mode 100644 index 5a96e773..00000000 --- a/plugins/memory/driver.go +++ /dev/null @@ -1,28 +0,0 @@ -package memory - -import ( - "github.com/spiral/roadrunner/v2/plugins/memory/bst" -) - -type Driver struct { - tree bst.Storage -} - -func NewInMemoryDriver() bst.Storage { - b := &Driver{ - tree: bst.NewBST(), - } - return b -} - -func (d *Driver) Insert(uuid string, topic string) { - d.tree.Insert(uuid, topic) -} - -func (d *Driver) Remove(uuid, topic string) { - d.tree.Remove(uuid, topic) -} - -func (d *Driver) Get(topic string) map[string]struct{} { - return d.tree.Get(topic) -} diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 5efd5522..2ad041aa 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -1,9 +1,9 @@ package memory import ( - "github.com/spiral/errors" + "sync" + "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -13,28 +13,16 @@ const ( type Plugin struct { log logger.Logger - cfg *Config -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("memory_plugin_init") - - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - p.log = log - return nil -} -func (p *Plugin) Serve() chan error { - const op = errors.Op("memory_plugin_serve") - errCh := make(chan error) - - return errCh + // channel with the messages from the RPC + pushCh chan pubsub.Message + // user-subscribed topics + topics sync.Map } -func (p *Plugin) Stop() error { +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + p.pushCh = make(chan pubsub.Message, 100) return nil } @@ -47,21 +35,42 @@ func (p *Plugin) Name() string { } func (p *Plugin) Publish(messages []pubsub.Message) error { - panic("implement me") + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + return nil } func (p *Plugin) PublishAsync(messages []pubsub.Message) { - panic("implement me") + go func() { + for i := 0; i < len(messages); i++ { + p.pushCh <- messages[i] + } + }() } func (p *Plugin) Subscribe(topics ...string) error { - panic("implement me") + for i := 0; i < len(topics); i++ { + p.topics.Store(topics[i], struct{}{}) + } + return nil } func (p *Plugin) Unsubscribe(topics ...string) error { - panic("implement me") + for i := 0; i < len(topics); i++ { + p.topics.Delete(topics[i]) + } + return nil } func (p *Plugin) Next() (pubsub.Message, error) { - panic("implement me") + msg := <-p.pushCh + // push only messages, which are subscribed + // TODO better??? + for i := 0; i < len(msg.Topics()); i++ { + if _, ok := p.topics.Load(msg.Topics()[i]); ok { + return msg, nil + } + } + return nil, nil } diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 29016720..8e924b2d 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -56,9 +56,9 @@ func (fi *FanIn) AddChannel(topics ...string) error { func (fi *FanIn) read() { for { select { - //here we receive message from us (which we sent before in Publish) - //it should be compatible with the websockets.Msg interface - //payload should be in the redis.message.payload field + // here we receive message from us (which we sent before in Publish) + // it should be compatible with the websockets.Msg interface + // payload should be in the redis.message.payload field case msg, ok := <-fi.pubsub.Channel(): // channel closed diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go index 5eb30c61..2b847173 100644 --- a/plugins/websockets/connection/connection.go +++ b/plugins/websockets/connection/connection.go @@ -43,8 +43,6 @@ func (c *Connection) Write(mt int, data []byte) error { } func (c *Connection) Read() (int, []byte, error) { - //c.RLock() - //defer c.RUnlock() const op = errors.Op("websocket_read") mt, data, err := c.conn.ReadMessage() diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 391c9a8c..9ef5e40a 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -22,11 +22,11 @@ type Executor struct { // associated connection ID connID string - pubsub pubsub.PubSub + pubsub map[string]pubsub.PubSub } // NewExecutor creates protected connection and starts command loop -func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor { +func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs map[string]pubsub.PubSub) *Executor { return &Executor{ conn: conn, connID: connID, @@ -58,31 +58,6 @@ func (e *Executor) StartCommandLoop() error { switch msg.Command() { // handle leave case commands.Join: - // TODO access validators model update - //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...) - //// validation error - //if err != nil { - // e.log.Error("validation error", "error", err) - // - // resp := &Response{ - // Topic: "#join", - // Payload: msg.Topics(), - // } - // - // packet, err := json.Marshal(resp) - // if err != nil { - // e.log.Error("error marshal the body", "error", err) - // return err - // } - // - // err = e.conn.Write(websocket.BinaryMessage, packet) - // if err != nil { - // e.log.Error("error writing payload to the connection", "payload", packet, "error", err) - // continue - // } - // - // continue - //} // associate connection with topics e.storage.Store(e.connID, msg.Topics()) @@ -103,7 +78,7 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub.Subscribe(msg.Topics()...) + err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) continue @@ -125,7 +100,7 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub.Unsubscribe(msg.Topics()...) + err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) continue diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index a247da69..bc5028e6 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -58,28 +58,25 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { func (p *Plugin) Serve() chan error { errCh := make(chan error) - go func() { - ps := p.pubsubs["redis"] - - for { - // get message - // get topic - // get connection uuid from the storage by the topic - // write payload into the connection - // do that in the workers pool - data, err := ps.Next() - if err != nil { - errCh <- err - return - } - if data == nil { - continue - } + // run all pubsubs drivers + for _, v := range p.pubsubs { + go func(ps pubsub.PubSub) { + for { + data, err := ps.Next() + if err != nil { + errCh <- err + return + } - p.workersPool.Queue(data) - } - }() + if data == nil { + continue + } + + p.workersPool.Queue(data) + } + }(v) + } return errCh } @@ -156,7 +153,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { // Executor wraps a connection to have a safe abstraction p.Lock() - e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"]) + e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs) p.Unlock() p.log.Info("websocket client connected", "uuid", connectionID) @@ -169,6 +166,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { }) } +// Publish is an entry point to the websocket PUBSUB func (p *Plugin) Publish(msg []pubsub.Message) error { p.Lock() defer p.Unlock() @@ -199,7 +197,6 @@ func (p *Plugin) PublishAsync(msg []pubsub.Message) { p.log.Error("publish async error", "error", err) return } - } } }() diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go index a7e49207..34f53cfd 100644 --- a/plugins/websockets/storage/storage.go +++ b/plugins/websockets/storage/storage.go @@ -3,7 +3,7 @@ package storage import ( "sync" - "github.com/spiral/roadrunner/v2/plugins/memory/bst" + "github.com/spiral/roadrunner/v2/pkg/bst" ) type Storage struct { diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 1fa29783..f6533dc4 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -12,7 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/pkg/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go index 32738ae0..d75620f3 100644 --- a/tests/plugins/http/parse_test.go +++ b/tests/plugins/http/parse_test.go @@ -3,7 +3,7 @@ package http import ( "testing" - "github.com/spiral/roadrunner/v2/pkg/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" ) var samples = []struct { diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go index 5b72df40..276c22ef 100644 --- a/tests/plugins/http/response_test.go +++ b/tests/plugins/http/response_test.go @@ -7,7 +7,7 @@ import ( "testing" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/stretchr/testify/assert" ) diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go index 82843d4e..903a930a 100644 --- a/tests/plugins/http/uploads_test.go +++ b/tests/plugins/http/uploads_test.go @@ -18,7 +18,7 @@ import ( j "github.com/json-iterator/go" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/pkg/worker_handler" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" "github.com/spiral/roadrunner/v2/plugins/http/config" "github.com/stretchr/testify/assert" ) |