summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linters.yml2
-rwxr-xr-x.gitignore1
-rwxr-xr-x.golangci.yml9
-rw-r--r--go.mod8
-rw-r--r--pkg/pool/interface.go2
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/pubsub/interface.go38
-rw-r--r--pkg/pubsub/message.go64
-rw-r--r--pkg/worker_handler/constants.go (renamed from plugins/http/worker_handler/constants.go)0
-rw-r--r--pkg/worker_handler/errors.go (renamed from plugins/http/worker_handler/errors.go)0
-rw-r--r--pkg/worker_handler/errors_windows.go (renamed from plugins/http/worker_handler/errors_windows.go)0
-rw-r--r--pkg/worker_handler/handler.go (renamed from plugins/http/worker_handler/handler.go)2
-rw-r--r--pkg/worker_handler/parse.go (renamed from plugins/http/worker_handler/parse.go)0
-rw-r--r--pkg/worker_handler/request.go (renamed from plugins/http/worker_handler/request.go)0
-rw-r--r--pkg/worker_handler/response.go (renamed from plugins/http/worker_handler/response.go)0
-rw-r--r--pkg/worker_handler/uploads.go (renamed from plugins/http/worker_handler/uploads.go)0
-rw-r--r--pkg/worker_watcher/interface.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go6
-rw-r--r--plugins/config/interface.go4
-rw-r--r--plugins/gzip/plugin.go5
-rw-r--r--plugins/http/plugin.go2
-rw-r--r--plugins/http/serve.go6
-rw-r--r--plugins/kv/drivers/boltdb/driver.go3
-rw-r--r--plugins/kv/drivers/memcached/driver.go3
-rw-r--r--plugins/kv/drivers/memory/driver.go3
-rw-r--r--plugins/kv/drivers/redis/plugin.go2
-rw-r--r--plugins/kv/storage.go4
-rw-r--r--plugins/logger/std_log_adapter.go9
-rw-r--r--plugins/memory/bst/bst.go136
-rw-r--r--plugins/memory/bst/bst_test.go37
-rw-r--r--plugins/memory/bst/interface.go11
-rw-r--r--plugins/memory/config.go8
-rw-r--r--plugins/memory/driver.go28
-rw-r--r--plugins/memory/plugin.go67
-rw-r--r--plugins/redis/fanin.go100
-rw-r--r--plugins/redis/plugin.go134
-rw-r--r--plugins/server/plugin.go4
-rw-r--r--plugins/websockets/commands/enums.go9
-rw-r--r--plugins/websockets/config.go67
-rw-r--r--plugins/websockets/connection/connection.go69
-rw-r--r--plugins/websockets/doc/broadcast.drawio1
-rw-r--r--plugins/websockets/doc/broadcast_arch.drawio1
-rw-r--r--plugins/websockets/doc/ws.drawio1
-rw-r--r--plugins/websockets/executor/executor.go146
-rw-r--r--plugins/websockets/plugin.go206
-rw-r--r--plugins/websockets/pool/workers_pool.go104
-rw-r--r--plugins/websockets/rpc.go47
-rw-r--r--plugins/websockets/storage/storage.go50
-rw-r--r--plugins/websockets/validator/access_validator.go102
-rw-r--r--plugins/websockets/validator/access_validator_test.go35
-rw-r--r--tests/Dockerfile0
-rw-r--r--tests/docker-compose.yaml3
-rw-r--r--tests/plugins/http/handler_test.go2
-rw-r--r--tests/plugins/http/parse_test.go2
-rw-r--r--tests/plugins/http/response_test.go2
-rw-r--r--tests/plugins/http/uploads_test.go2
-rw-r--r--tests/plugins/informer/.rr-informer.yaml6
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml50
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go102
-rwxr-xr-xutils/network.go3
61 files changed, 1638 insertions, 80 deletions
diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml
index 82072675..24d839e5 100644
--- a/.github/workflows/linters.yml
+++ b/.github/workflows/linters.yml
@@ -13,6 +13,6 @@ jobs:
- name: Run linter
uses: golangci/golangci-lint-action@v2 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
- version: v1.39 # without patch version
+ version: v1.41 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m
diff --git a/.gitignore b/.gitignore
index 9a9a07b6..beb3f885 100755
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ tests/vendor/
.rr-sample.yaml
cmd
rr
+**/old
diff --git a/.golangci.yml b/.golangci.yml
index bfc69f57..41e1d68f 100755
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -14,8 +14,10 @@ output:
linters-settings:
govet:
check-shadowing: true
- golint:
- min-confidence: 0.1
+ revive:
+ confidence: 0.8
+ errorCode: 0
+ warningCode: 0
gocyclo:
min-complexity: 15
godot:
@@ -55,7 +57,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- gocritic # The most opinionated Go source code linter
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
- - golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
+ - revive # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
- goprintffuncname # Checks that printf-like functions are named with `f` at the end
- gosec # Inspects source code for security problems
- gosimple # Linter for Go source code that specializes in simplifying a code
@@ -88,3 +90,4 @@ issues:
- goconst
- noctx
- gosimple
+ - revive
diff --git a/go.mod b/go.mod
index dc2cf5ba..ec05039a 100644
--- a/go.mod
+++ b/go.mod
@@ -14,10 +14,13 @@ require (
github.com/gofiber/fiber/v2 v2.10.0
github.com/golang/mock v1.4.4
github.com/google/flatbuffers v1.12.1
+ github.com/google/uuid v1.2.0
github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.11
+ github.com/klauspost/compress v1.12.2 // indirect
github.com/olekukonko/tablewriter v0.0.5
github.com/prometheus/client_golang v1.10.0
+ github.com/savsgio/gotils v0.0.0-20210316171653-c54912823645 // indirect
github.com/shirou/gopsutil v3.21.3+incompatible
github.com/spf13/viper v1.7.1
// SPIRAL ====
@@ -27,12 +30,13 @@ require (
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.4 // indirect
- github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
+ github.com/valyala/fasthttp v1.24.0 // indirect
+ github.com/valyala/tcplisten v1.0.0
github.com/yookoala/gofast v0.6.0
go.etcd.io/bbolt v1.3.5
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.17.0
golang.org/x/net v0.0.0-20210226101413-39120d07d75e
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
- golang.org/x/sys v0.0.0-20210309074719-68d13333faf2
+ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015
)
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index 4ef2f2e7..c22fbbd3 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -18,7 +18,7 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
- // Remove worker from the pool.
+ // RemoveWorker removes worker from the pool.
RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index d57cc95c..8c9d69b9 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -47,7 +47,7 @@ type StaticPool struct {
allocator worker.Allocator
// err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint:golint,stylecheck
+ err_encoder ErrorEncoder //nolint:stylecheck
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 40903db3..ca61dbc4 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -15,7 +15,7 @@ import (
const MB = 1024 * 1024
// NSEC_IN_SEC nanoseconds in second
-const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
type Supervised interface {
Pool
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
new file mode 100644
index 00000000..80dab0c3
--- /dev/null
+++ b/pkg/pubsub/interface.go
@@ -0,0 +1,38 @@
+package pubsub
+
+// PubSub ...
+type PubSub interface {
+ Publisher
+ Subscriber
+ Reader
+}
+
+// Subscriber defines the ability to operate as message passing broker.
+type Subscriber interface {
+ // Subscribe broker to one or multiple topics.
+ Subscribe(topics ...string) error
+ // Unsubscribe from one or multiply topics
+ Unsubscribe(topics ...string) error
+}
+
+// Publisher publish one or more messages
+type Publisher interface {
+ // Publish one or multiple Channel.
+ Publish(messages []Message) error
+
+ // PublishAsync publish message and return immediately
+ // If error occurred it will be printed into the logger
+ PublishAsync(messages []Message)
+}
+
+// Reader interface should return next message
+type Reader interface {
+ Next() (Message, error)
+}
+
+type Message interface {
+ Command() string
+ Payload() []byte
+ Topics() []string
+ Broker() string
+}
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
new file mode 100644
index 00000000..ab74eb98
--- /dev/null
+++ b/pkg/pubsub/message.go
@@ -0,0 +1,64 @@
+package pubsub
+
+import (
+ json "github.com/json-iterator/go"
+)
+
+type Msg struct {
+ // Topic message been pushed into.
+ T []string `json:"topic"`
+
+ // Command (join, leave, headers)
+ C string `json:"command"`
+
+ // Broker (redis, memory)
+ B string `json:"broker"`
+
+ // Payload to be broadcasted
+ P []byte `json:"payload"`
+}
+
+//func (m Msg) UnmarshalBinary(data []byte) error {
+// //Use default gob decoder
+// reader := bytes.NewReader(data)
+// dec := gob.NewDecoder(reader)
+// if err := dec.Decode(&m); err != nil {
+// return err
+// }
+//
+// return nil
+//}
+
+func (m *Msg) MarshalBinary() ([]byte, error) {
+ //buf := new(bytes.Buffer)
+ //
+ //for i := 0; i < len(m.T); i++ {
+ // buf.WriteString(m.T[i])
+ //}
+ //
+ //buf.WriteString(m.C)
+ //buf.WriteString(m.B)
+ //buf.Write(m.P)
+
+ return json.Marshal(m)
+
+}
+
+// Payload in raw bytes
+func (m *Msg) Payload() []byte {
+ return m.P
+}
+
+// Command for the connection
+func (m *Msg) Command() string {
+ return m.C
+}
+
+// Topics to subscribe
+func (m *Msg) Topics() []string {
+ return m.T
+}
+
+func (m *Msg) Broker() string {
+ return m.B
+}
diff --git a/plugins/http/worker_handler/constants.go b/pkg/worker_handler/constants.go
index 3355d9c2..3355d9c2 100644
--- a/plugins/http/worker_handler/constants.go
+++ b/pkg/worker_handler/constants.go
diff --git a/plugins/http/worker_handler/errors.go b/pkg/worker_handler/errors.go
index 5fa8e64e..5fa8e64e 100644
--- a/plugins/http/worker_handler/errors.go
+++ b/pkg/worker_handler/errors.go
diff --git a/plugins/http/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
index 390cc7d1..390cc7d1 100644
--- a/plugins/http/worker_handler/errors_windows.go
+++ b/pkg/worker_handler/errors_windows.go
diff --git a/plugins/http/worker_handler/handler.go b/pkg/worker_handler/handler.go
index be53fc12..d98cdef0 100644
--- a/plugins/http/worker_handler/handler.go
+++ b/pkg/worker_handler/handler.go
@@ -89,7 +89,7 @@ func (h *Handler) AddListener(l events.Listener) {
// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serve_http")
start := time.Now()
// validating request size
diff --git a/plugins/http/worker_handler/parse.go b/pkg/worker_handler/parse.go
index 2790da2a..2790da2a 100644
--- a/plugins/http/worker_handler/parse.go
+++ b/pkg/worker_handler/parse.go
diff --git a/plugins/http/worker_handler/request.go b/pkg/worker_handler/request.go
index 178bc827..178bc827 100644
--- a/plugins/http/worker_handler/request.go
+++ b/pkg/worker_handler/request.go
diff --git a/plugins/http/worker_handler/response.go b/pkg/worker_handler/response.go
index 1763d304..1763d304 100644
--- a/plugins/http/worker_handler/response.go
+++ b/pkg/worker_handler/response.go
diff --git a/plugins/http/worker_handler/uploads.go b/pkg/worker_handler/uploads.go
index e695000e..e695000e 100644
--- a/plugins/http/worker_handler/uploads.go
+++ b/pkg/worker_handler/uploads.go
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 4625b7a7..29fa3640 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -23,9 +23,9 @@ type Watcher interface {
// Destroy destroys the underlying container
Destroy(ctx context.Context)
- // WorkersList return all container w/o removing it from internal storage
+ // List return all container w/o removing it from internal storage
List() []worker.BaseProcess
- // RemoveWorker remove worker from the container
+ // Remove will remove worker from the container
Remove(wb worker.BaseProcess)
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 5aec4ee6..557563ac 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -11,7 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead container will not be replaced
+// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
@@ -215,7 +215,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
}
-// Warning, this is O(n) operation, and it will return copy of the actual workers
+// List - this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()
diff --git a/plugins/config/interface.go b/plugins/config/interface.go
index 59ad981f..b3854e09 100644
--- a/plugins/config/interface.go
+++ b/plugins/config/interface.go
@@ -11,7 +11,7 @@ type Configurer interface {
// }
UnmarshalKey(name string, out interface{}) error
- // Unmarshal unmarshals the config into a Struct. Make sure that the tags
+ // Unmarshal unmarshal the config into a Struct. Make sure that the tags
// on the fields of the structure are properly set.
Unmarshal(out interface{}) error
@@ -24,6 +24,6 @@ type Configurer interface {
// Has checks if config section exists.
Has(name string) bool
- // Returns General section. Read-only
+ // GetCommonConfig returns General section. Read-only
GetCommonConfig() *General
}
diff --git a/plugins/gzip/plugin.go b/plugins/gzip/plugin.go
index 24b125fb..133704f8 100644
--- a/plugins/gzip/plugin.go
+++ b/plugins/gzip/plugin.go
@@ -4,6 +4,7 @@ import (
"net/http"
"github.com/NYTimes/gziphandler"
+ "github.com/gofiber/fiber/v2"
)
const PluginName = "gzip"
@@ -21,6 +22,10 @@ func (g *Plugin) Middleware(next http.Handler) http.Handler {
})
}
+func (g *Plugin) FiberMiddleware(ctx fiber.Ctx) {
+
+}
+
// Available interface implementation
func (g *Plugin) Available() {}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 2b68bbe5..d5324246 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -13,10 +13,10 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
httpConfig "github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/status"
diff --git a/plugins/http/serve.go b/plugins/http/serve.go
index 78796322..734860f5 100644
--- a/plugins/http/serve.go
+++ b/plugins/http/serve.go
@@ -21,7 +21,7 @@ func (s *Plugin) serveHTTP(errCh chan error) {
if s.http == nil {
return
}
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serveHTTP")
if len(s.mdwr) > 0 {
applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log)
@@ -43,7 +43,7 @@ func (s *Plugin) serveHTTPS(errCh chan error) {
if s.https == nil {
return
}
- const op = errors.Op("http_plugin_serve_https")
+ const op = errors.Op("serveHTTPS")
if len(s.mdwr) > 0 {
applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
}
@@ -70,7 +70,7 @@ func (s *Plugin) serveFCGI(errCh chan error) {
if s.fcgi == nil {
return
}
- const op = errors.Op("http_plugin_serve_fcgi")
+ const op = errors.Op("serveFCGI")
if len(s.mdwr) > 0 {
applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 2e2df527..0f647cb1 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
)
@@ -393,7 +394,7 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
- err := b.Delete([]byte(k))
+ err := b.Delete(utils.AsBytes(k))
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 17b06fa0..02281ed5 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -148,7 +149,7 @@ func (d *Driver) Set(items ...kv.Item) error {
memcachedItem := &memcache.Item{
Key: items[i].Key,
// unsafe convert
- Value: []byte(items[i].Value),
+ Value: utils.AsBytes(items[i].Value),
Flags: 0,
}
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index 1e0d03d4..c2494ee7 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -70,7 +71,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return []byte(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(kv.Item).Value), nil
}
return nil, nil
}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
index d2183411..3694c5a7 100644
--- a/plugins/kv/drivers/redis/plugin.go
+++ b/plugins/kv/drivers/redis/plugin.go
@@ -28,7 +28,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
// Serve is noop here
func (s *Plugin) Serve() chan error {
- return make(chan error, 1)
+ return make(chan error)
}
func (s *Plugin) Stop() error {
diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go
index fe2fa10b..9a609735 100644
--- a/plugins/kv/storage.go
+++ b/plugins/kv/storage.go
@@ -90,9 +90,9 @@ func (p *Plugin) Serve() chan error {
return errCh
}
- // config key for the particular sub-driver
+ // config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
- // at this point we know, that driver field present in the cofiguration
+ // at this point we know, that driver field present in the configuration
switch v.(map[string]interface{})[driver] {
case memcached:
if _, ok := p.drivers[memcached]; !ok {
diff --git a/plugins/logger/std_log_adapter.go b/plugins/logger/std_log_adapter.go
index 484cc23e..479aa565 100644
--- a/plugins/logger/std_log_adapter.go
+++ b/plugins/logger/std_log_adapter.go
@@ -1,7 +1,7 @@
package logger
import (
- "unsafe"
+ "github.com/spiral/roadrunner/v2/utils"
)
// StdLogAdapter can be passed to the http.Server or any place which required standard logger to redirect output
@@ -12,7 +12,7 @@ type StdLogAdapter struct {
// Write io.Writer interface implementation
func (s *StdLogAdapter) Write(p []byte) (n int, err error) {
- s.log.Error("server internal error", "message", toString(p))
+ s.log.Error("server internal error", "message", utils.AsString(p))
return len(p), nil
}
@@ -24,8 +24,3 @@ func NewStdAdapter(log Logger) *StdLogAdapter {
return logAdapter
}
-
-// unsafe, but lightning fast []byte to string conversion
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))
-}
diff --git a/plugins/memory/bst/bst.go b/plugins/memory/bst/bst.go
new file mode 100644
index 00000000..3060ff11
--- /dev/null
+++ b/plugins/memory/bst/bst.go
@@ -0,0 +1,136 @@
+package bst
+
+// BST ...
+type BST struct {
+ // registered topic, not unique
+ topic string
+ // associated connections with the topic
+ uuids map[string]struct{}
+
+ // left and right subtrees
+ left *BST
+ right *BST
+}
+
+func NewBST() Storage {
+ return &BST{}
+}
+
+// Insert uuid to the topic
+func (b *BST) Insert(uuid string, topic string) {
+ curr := b
+
+ for {
+ if curr.topic == topic {
+ curr.uuids[uuid] = struct{}{}
+ return
+ }
+ // if topic less than curr topic
+ if curr.topic < topic {
+ if curr.left == nil {
+ curr.left = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+ // move forward
+ curr = curr.left
+ } else {
+ if curr.right == nil {
+ curr.right = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+
+ curr = curr.right
+ }
+ }
+}
+
+func (b *BST) Get(topic string) map[string]struct{} {
+ curr := b
+ for curr != nil {
+ if curr.topic == topic {
+ return curr.uuids
+ }
+ if curr.topic < topic {
+ curr = curr.left
+ continue
+ }
+ if curr.topic > topic {
+ curr = curr.right
+ continue
+ }
+ }
+
+ return nil
+}
+
+func (b *BST) Remove(uuid string, topic string) {
+ b.removeHelper(uuid, topic, nil)
+}
+
+func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
+ curr := b
+ for curr != nil {
+ if topic < curr.topic {
+ parent = curr
+ curr = curr.left
+ } else if topic > curr.topic {
+ parent = curr
+ curr = curr.right
+ } else {
+ if len(curr.uuids) > 1 {
+ if _, ok := curr.uuids[uuid]; ok {
+ delete(curr.uuids, uuid)
+ return
+ }
+ }
+
+ if curr.left != nil && curr.right != nil {
+ curr.topic, curr.uuids = curr.right.traverseForMinString()
+ curr.right.removeHelper(curr.topic, uuid, curr)
+ } else if parent == nil {
+ if curr.left != nil {
+ curr.topic = curr.left.topic
+ curr.uuids = curr.left.uuids
+
+ curr.right = curr.left.right
+ curr.left = curr.left.left
+ } else if curr.right != nil {
+ curr.topic = curr.right.topic
+ curr.uuids = curr.right.uuids
+
+ curr.left = curr.right.left
+ curr.right = curr.right.right
+ } else {
+ // single node tree
+ }
+ } else if parent.left == curr {
+ if curr.left != nil {
+ parent.left = curr.left
+ } else {
+ parent.left = curr.right
+ }
+ } else if parent.right == curr {
+ if curr.left != nil {
+ parent.right = curr.left
+ } else {
+ parent.right = curr.right
+ }
+ }
+ break
+ }
+ }
+}
+
+//go:inline
+func (b *BST) traverseForMinString() (string, map[string]struct{}) {
+ if b.left == nil {
+ return b.topic, b.uuids
+ }
+ return b.left.traverseForMinString()
+}
diff --git a/plugins/memory/bst/bst_test.go b/plugins/memory/bst/bst_test.go
new file mode 100644
index 00000000..e8a13760
--- /dev/null
+++ b/plugins/memory/bst/bst_test.go
@@ -0,0 +1,37 @@
+package bst
+
+import (
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewBST(t *testing.T) {
+ // create a new bst
+ g := NewBST()
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments2")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments3")
+ }
+
+ // should be 100
+ exist := g.Get("comments")
+ assert.Len(t, exist, 100)
+
+ // should be 100
+ exist2 := g.Get("comments2")
+ assert.Len(t, exist2, 100)
+
+ // should be 100
+ exist3 := g.Get("comments3")
+ assert.Len(t, exist3, 100)
+}
diff --git a/plugins/memory/bst/interface.go b/plugins/memory/bst/interface.go
new file mode 100644
index 00000000..ecf40414
--- /dev/null
+++ b/plugins/memory/bst/interface.go
@@ -0,0 +1,11 @@
+package bst
+
+// Storage is general in-memory BST storage implementation
+type Storage interface {
+ // Insert inserts to a vertex with topic ident connection uuid
+ Insert(uuid string, topic string)
+ // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
+ Remove(uuid, topic string)
+ // Get will return all connections associated with the topic
+ Get(topic string) map[string]struct{}
+}
diff --git a/plugins/memory/config.go b/plugins/memory/config.go
new file mode 100644
index 00000000..08dd9fc3
--- /dev/null
+++ b/plugins/memory/config.go
@@ -0,0 +1,8 @@
+package memory
+
+// Config for the memory driver is empty, it's just a placeholder
+type Config struct {
+ Path string `mapstructure:"path"`
+}
+
+func (c *Config) InitDefaults() {}
diff --git a/plugins/memory/driver.go b/plugins/memory/driver.go
new file mode 100644
index 00000000..5a96e773
--- /dev/null
+++ b/plugins/memory/driver.go
@@ -0,0 +1,28 @@
+package memory
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/memory/bst"
+)
+
+type Driver struct {
+ tree bst.Storage
+}
+
+func NewInMemoryDriver() bst.Storage {
+ b := &Driver{
+ tree: bst.NewBST(),
+ }
+ return b
+}
+
+func (d *Driver) Insert(uuid string, topic string) {
+ d.tree.Insert(uuid, topic)
+}
+
+func (d *Driver) Remove(uuid, topic string) {
+ d.tree.Remove(uuid, topic)
+}
+
+func (d *Driver) Get(topic string) map[string]struct{} {
+ return d.tree.Get(topic)
+}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
new file mode 100644
index 00000000..5efd5522
--- /dev/null
+++ b/plugins/memory/plugin.go
@@ -0,0 +1,67 @@
+package memory
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "memory"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg *Config
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("memory_plugin_init")
+
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("memory_plugin_serve")
+ errCh := make(chan error)
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+// Available interface implementation for the plugin
+func (p *Plugin) Available() {}
+
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Publish(messages []pubsub.Message) error {
+ panic("implement me")
+}
+
+func (p *Plugin) PublishAsync(messages []pubsub.Message) {
+ panic("implement me")
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ panic("implement me")
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ panic("implement me")
+}
+
+func (p *Plugin) Next() (pubsub.Message, error) {
+ panic("implement me")
+}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
new file mode 100644
index 00000000..29016720
--- /dev/null
+++ b/plugins/redis/fanin.go
@@ -0,0 +1,100 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type FanIn struct {
+ sync.Mutex
+
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan pubsub.Message
+
+ exit chan struct{}
+}
+
+func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
+ out := make(chan pubsub.Message, 100)
+ fi := &FanIn{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (fi *FanIn) AddChannel(topics ...string) error {
+ const op = errors.Op("fanin_addchannel")
+ err := fi.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (fi *FanIn) read() {
+ for {
+ select {
+ //here we receive message from us (which we sent before in Publish)
+ //it should be compatible with the websockets.Msg interface
+ //payload should be in the redis.message.payload field
+
+ case msg, ok := <-fi.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+ m := &pubsub.Msg{}
+ err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("failed to unmarshal payload", "error", err.Error())
+ continue
+ }
+
+ fi.out <- m
+ case <-fi.exit:
+ return
+ }
+ }
+}
+
+func (fi *FanIn) RemoveChannel(topics ...string) error {
+ const op = errors.Op("fanin_remove")
+ err := fi.pubsub.Unsubscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (fi *FanIn) Stop() error {
+ fi.exit <- struct{}{}
+ close(fi.out)
+ close(fi.exit)
+ return nil
+}
+
+func (fi *FanIn) Consume() <-chan pubsub.Message {
+ return fi.out
+}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 2eab7043..24ed1f92 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -1,8 +1,12 @@
package redis
import (
+ "context"
+ "sync"
+
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -10,73 +14,133 @@ import (
const PluginName = "redis"
type Plugin struct {
+ sync.Mutex
// config for RR integration
cfg *Config
// logger
log logger.Logger
// redis universal client
universalClient redis.UniversalClient
+
+ fanin *FanIn
}
-func (s *Plugin) GetClient() redis.UniversalClient {
- return s.universalClient
+func (p *Plugin) GetClient() redis.UniversalClient {
+ return p.universalClient
}
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("redis_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- s.cfg.InitDefaults()
- s.log = log
-
- s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: s.cfg.Addrs,
- DB: s.cfg.DB,
- Username: s.cfg.Username,
- Password: s.cfg.Password,
- SentinelPassword: s.cfg.SentinelPassword,
- MaxRetries: s.cfg.MaxRetries,
- MinRetryBackoff: s.cfg.MaxRetryBackoff,
- MaxRetryBackoff: s.cfg.MaxRetryBackoff,
- DialTimeout: s.cfg.DialTimeout,
- ReadTimeout: s.cfg.ReadTimeout,
- WriteTimeout: s.cfg.WriteTimeout,
- PoolSize: s.cfg.PoolSize,
- MinIdleConns: s.cfg.MinIdleConns,
- MaxConnAge: s.cfg.MaxConnAge,
- PoolTimeout: s.cfg.PoolTimeout,
- IdleTimeout: s.cfg.IdleTimeout,
- IdleCheckFrequency: s.cfg.IdleCheckFreq,
- ReadOnly: s.cfg.ReadOnly,
- RouteByLatency: s.cfg.RouteByLatency,
- RouteRandomly: s.cfg.RouteRandomly,
- MasterName: s.cfg.MasterName,
+ p.cfg.InitDefaults()
+ p.log = log
+
+ p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: p.cfg.Addrs,
+ DB: p.cfg.DB,
+ Username: p.cfg.Username,
+ Password: p.cfg.Password,
+ SentinelPassword: p.cfg.SentinelPassword,
+ MaxRetries: p.cfg.MaxRetries,
+ MinRetryBackoff: p.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: p.cfg.MaxRetryBackoff,
+ DialTimeout: p.cfg.DialTimeout,
+ ReadTimeout: p.cfg.ReadTimeout,
+ WriteTimeout: p.cfg.WriteTimeout,
+ PoolSize: p.cfg.PoolSize,
+ MinIdleConns: p.cfg.MinIdleConns,
+ MaxConnAge: p.cfg.MaxConnAge,
+ PoolTimeout: p.cfg.PoolTimeout,
+ IdleTimeout: p.cfg.IdleTimeout,
+ IdleCheckFrequency: p.cfg.IdleCheckFreq,
+ ReadOnly: p.cfg.ReadOnly,
+ RouteByLatency: p.cfg.RouteByLatency,
+ RouteRandomly: p.cfg.RouteRandomly,
+ MasterName: p.cfg.MasterName,
})
+ // init fanin
+ p.fanin = NewFanIn(p.universalClient, log)
+
return nil
}
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
return errCh
}
-func (s Plugin) Stop() error {
- return s.universalClient.Close()
+func (p *Plugin) Stop() error {
+ const op = errors.Op("redis_plugin_stop")
+ err := p.fanin.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = p.universalClient.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Available interface implementation
-func (s *Plugin) Available() {
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ return f.Err()
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error())
+ continue
+ }
+ }
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ return p.fanin.AddChannel(topics...)
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ return p.fanin.RemoveChannel(topics...)
+}
+
+// Next return next message
+func (p *Plugin) Next() (pubsub.Message, error) {
+ return <-p.fanin.Consume(), nil
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index ef77f7ab..aab9dcde 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -25,9 +25,9 @@ import (
const PluginName = "server"
// RR_RELAY env variable key (internal)
-const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck
+const RR_RELAY = "RR_RELAY" //nolint:stylecheck
// RR_RPC env variable key (internal) if the RPC presents
-const RR_RPC = "RR_RPC" //nolint:golint,stylecheck
+const RR_RPC = "RR_RPC" //nolint:stylecheck
// Plugin manages worker
type Plugin struct {
diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go
new file mode 100644
index 00000000..18c63be3
--- /dev/null
+++ b/plugins/websockets/commands/enums.go
@@ -0,0 +1,9 @@
+package commands
+
+type Command string
+
+const (
+ Leave string = "leave"
+ Join string = "join"
+ Headers string = "headers"
+)
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
new file mode 100644
index 00000000..f3cb8e12
--- /dev/null
+++ b/plugins/websockets/config.go
@@ -0,0 +1,67 @@
+package websockets
+
+import "time"
+
+/*
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # also, they should implement RPC methods to publish data into them
+ # pubsubs might use general config section or its own
+
+ pubsubs:["redis", "amqp", "memory"]
+
+ # sample of the own config section for the redis pubsub driver
+ redis:
+ address:
+ - localhost:1111
+ .... the rest
+
+
+ # path used as websockets path
+ path: "/ws"
+*/
+
+// Config represents configuration for the ws plugin
+type Config struct {
+ // http path for the websocket
+ Path string `mapstructure:"path"`
+ // ["redis", "amqp", "memory"]
+ PubSubs []string `mapstructure:"pubsubs"`
+ Middleware []string `mapstructure:"middleware"`
+ Redis *RedisConfig `mapstructure:"redis"`
+}
+
+type RedisConfig struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefault initialize default values for the ws config
+func (c *Config) InitDefault() {
+ if c.Path == "" {
+ c.Path = "/ws"
+ }
+ if len(c.PubSubs) == 0 {
+ // memory used by default
+ c.PubSubs = append(c.PubSubs, "memory")
+ }
+}
diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go
new file mode 100644
index 00000000..5eb30c61
--- /dev/null
+++ b/plugins/websockets/connection/connection.go
@@ -0,0 +1,69 @@
+package connection
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Connection represents wrapped and safe to use from the different threads websocket connection
+type Connection struct {
+ sync.RWMutex
+ log logger.Logger
+ conn *websocket.Conn
+}
+
+func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection {
+ return &Connection{
+ conn: wsConn,
+ log: log,
+ }
+}
+
+func (c *Connection) Write(mt int, data []byte) error {
+ c.Lock()
+ defer c.Unlock()
+
+ const op = errors.Op("websocket_write")
+ // handle a case when a goroutine tried to write into the closed connection
+ defer func() {
+ if r := recover(); r != nil {
+ c.log.Warn("panic handled, tried to write into the closed connection")
+ }
+ }()
+
+ err := c.conn.WriteMessage(mt, data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *Connection) Read() (int, []byte, error) {
+ //c.RLock()
+ //defer c.RUnlock()
+ const op = errors.Op("websocket_read")
+
+ mt, data, err := c.conn.ReadMessage()
+ if err != nil {
+ return -1, nil, errors.E(op, err)
+ }
+
+ return mt, data, nil
+}
+
+func (c *Connection) Close() error {
+ c.Lock()
+ defer c.Unlock()
+ const op = errors.Op("websocket_close")
+
+ err := c.conn.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio
new file mode 100644
index 00000000..748fec45
--- /dev/null
+++ b/plugins/websockets/doc/broadcast.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-23T20:08:57.443Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="PhXRtBI6dFZzw_439Bi0" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">5VpZc+I4EP41VGUfoISNDx45ctUkm4NkZzIvU8KWjTbCYmWRwPz6lWzZ2JZhIByZrWWmiNU6aH19qLuthjmYLi4ZnE1uqY9IwwD+omEOG4bRtk1D/JGUZUrpdtyUEDLsq0Erwgj/RIoIFHWOfRSXBnJKCcezMtGjUYQ8XqJBxuh7eVhASflXZzBEGmHkQaJTv2KfTxS1a4BVxxXC4ST7aQOoninMRitCPIE+fS+QzPOGOWCU8vRpuhggItHLgEnnXazpzTljKOLbTFh2bm+eXvow8p5M8vrAMIh6za7ijS+zHSNfAKCalPEJDWkEyfmK2l9RbyidiWFtQfwbcb5U8oNzTgVpwqdE9aIF5t8Kzy/iGbQs1RpKdQFZY5k1Is6W34qNwizZXE1LWtk8HRcFVUznzEMbwMgUDLIQ8U3jlEQlVIVfULBfIjpFgiExgCECOX4r6xJUKhnm4/Kp9xQLng2gzMfOVEcZT6cDykuknKpZK9GLhwIbK1KiEDsoh2L4DZK52kL/8a43HPRGT4J89SygBmf3N8+X13/+oenR+wRzNJrBBPF34R3KGhFgQgaUUJaMNn2I3MAT9Jgz+ooKPbbnonEgZ9CIF+gg+WyS9xtiHC02Ckj1dswK0JmE3ws2nwljUjB3G6yXaUkau0JvaNB/He0JcWDJf7UQJx8d4vRzGIhzj64gNrpbQlzV+YNBbGoQP54Pr/dGOQgMr1aRfXtsWzUoH1KRqyh3nM9GuaOhfP1n8/b89u7xZW+kkb0Gaac7ThA9IdKW5X4y0q6GtNg6EUFRLFGej5szMg9xFGuwix3zMtKQ4DASz57ABwnw+hIXLKKinuqYYt9PYgGGYvwTjpOlJLIzeRYlO7P6DWso1xLBQJzGBe3jeGvT3lLJjQNAb3Z+DIMp/9JedBcE/ugPbl7+arYtDXvztIFVFj2lIZKzZWTVLkdWzhFDK7BtbLVlaKV0AbTaHaNbUodMGB8NvrIhNAhidJy4CuhHz71YCvSf9z1+UNu3kFPnFLu2Y8IjHz9O9xPjqMHwhUy/O/5iPut98f2bhyGPm/aJzDDPVVbpyUvJhOoNavcoeCsTrIVi2+zG2NICt7avvQRYd6qlqb6sBMivCRLfl3eS5cifKSM+Cxid5r0ejQIc6jmK3OMNHCNyvONPl9cmPdVMLa9sqB9pFGsHdSbYBC3bdsoeMRPpnulo0zRbZnnhtlle5CA+c5P2FvRAkyaMZ2kJKMAL5JdFWhOXVIWMp0ktKPGPytDbxoo+xNNQcE7wWHzDn3OG5CZDFCEGBfcXfVlsQqwVv4WHcaa2ZbSscqCje1PXycYU3alztEBHR/24YU1eI8q96Rb1orIPbv/CB3/co5rbxjQHrxftJURDDz/qxXpy17hR5/b3jaBlmFbZpDK/sqdrrESgRsvunsox6mWM++f+zfXoatdI8gAuywV2yygDbNl6XlzvtMyjOS397Ghr4PyP3Zilu7FNmrZ9amZUTA18xNZ6jMFlYYDyI2tN0bXKOYhdfS2y23jxkHJwzCxxk1iKhn11/wlGXU3qbGPLStchDLoWma6GzAi+oTzaf0fjmHqvUirZCYFppCH3Xyt7WZW3QfmbxqIc3JOWvTQ59B/vvpw/NswL+d8AzYZhw6lUyZAnIIDRfBx7DI8FvKfXZNsqpy/tGgRd54Qnk67IAjAioZqVsLH/mcu3xP2pcNFYKGJP9ILZQnynqKb0JpfHkuzrFPqkmjeVZss+pdylbh95lMHESpIx88hHjOAIrX5aPCkZpgyOM8KIs7nHRTqU9QgkxtXRgjar0iasSvnQpgkKeNrpys4qv090hr1kzjBZneEo3MTVMXgYZR6owgU4ywsTCtQcknu4JBT6+RQG3/VBJ99HwXgVX0Vfm9Vc8k2Bswlkvkd95NdssZ77ilOoeuBExXNHnfLbp2JUQJK7FbKgVpPDy/YFnGIiLf8KkTckVz24g+lWYgnLdFtZiaTgY8y609Ltyvr7kRy1nvHpAXDk9+SFGdGKaITW1TcbO93F+Mi9j48HsjX5+Ca3+8tAtiAyq0Zi1ofiWT13rLxJtIFbXiLdt3bLQ1/IrmifVVnocNdF6rVMT0V7tw87h6y/9Wte+5SveetRdtYGDXNS9a0EZ5TBBHmvibKGOBYBAJJnC5eHY1xwxKvx+gqPyMesUHj3mdB5tnGyIBZ52k0PKk7+EBG0Wa5NOHW3I5ya0sQhhHn5BoIF+xE9jB5vX69h8+rqu1sTQYuQGVwmx2mcRQ3Vg7MwZH00vT67UXj+VrmNXb234tZIBhwmxxTN1aXL1Pet7q6a5/8C</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/broadcast_arch.drawio b/plugins/websockets/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..54bba9c7
--- /dev/null
+++ b/plugins/websockets/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-24T11:49:56.412Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="onoEtIi9b01eD9L0oL7y" version="14.5.1" type="device"><diagram id="uVz6nu4iiJ3Jh2FwnUZb" name="Page-1">zZSxboMwEEC/hrES4JAma9I0WSpVYkCdKgdfsVXDIWMK6dfXNEcAoUjtUKUT9rvz2X4ceGybt3vDS/mEArQX+qL12IMXhsE6WrlHR05EAnZ/JplRgtgAYvUJBH2itRJQTRItoraqnMIUiwJSO2HcGGymaW+op7uWPIMZiFOu5zRRwsr+YqE/BA6gMtlvHfoUyXmfTaCSXGAzQmznsa1BtOdR3m5Bd/p6Med1j1eil5MZKOxPFpS71cthFbVJvE+Xr4k0+IZ3VOWD65pu/FwftaokmK64rjNV0PHtqZdisC4EdGV9j20aqSzEJU+7aOP6wDFpc+1mgRvSBmAstFdPHlx8uFYCzMGak0uhBeGCFFIXsTXNm9Er6TXL0dtgxDh1QXYpPXhyA1L1C23hTNvGIBcpr+y/sxUFt7bFZrYSOFaYvsPtbS2WU1uL6O9suenwuX/HRr9NtvsC</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/ws.drawio b/plugins/websockets/doc/ws.drawio
new file mode 100644
index 00000000..739b797a
--- /dev/null
+++ b/plugins/websockets/doc/ws.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
new file mode 100644
index 00000000..391c9a8c
--- /dev/null
+++ b/plugins/websockets/executor/executor.go
@@ -0,0 +1,146 @@
+package executor
+
+import (
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/commands"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type Response struct {
+ Topic string `json:"topic"`
+ Payload []string `json:"payload"`
+}
+
+type Executor struct {
+ conn *connection.Connection
+ storage *storage.Storage
+ log logger.Logger
+
+ // associated connection ID
+ connID string
+ pubsub pubsub.PubSub
+}
+
+// NewExecutor creates protected connection and starts command loop
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor {
+ return &Executor{
+ conn: conn,
+ connID: connID,
+ storage: bst,
+ log: log,
+ pubsub: pubsubs,
+ }
+}
+
+func (e *Executor) StartCommandLoop() error {
+ for {
+ mt, data, err := e.conn.Read()
+ if err != nil {
+ if mt == -1 {
+ return err
+ }
+
+ return err
+ }
+
+ msg := &pubsub.Msg{}
+
+ err = json.Unmarshal(data, msg)
+ if err != nil {
+ e.log.Error("error unmarshal message", "error", err)
+ continue
+ }
+
+ switch msg.Command() {
+ // handle leave
+ case commands.Join:
+ // TODO access validators model update
+ //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...)
+ //// validation error
+ //if err != nil {
+ // e.log.Error("validation error", "error", err)
+ //
+ // resp := &Response{
+ // Topic: "#join",
+ // Payload: msg.Topics(),
+ // }
+ //
+ // packet, err := json.Marshal(resp)
+ // if err != nil {
+ // e.log.Error("error marshal the body", "error", err)
+ // return err
+ // }
+ //
+ // err = e.conn.Write(websocket.BinaryMessage, packet)
+ // if err != nil {
+ // e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ // continue
+ // }
+ //
+ // continue
+ //}
+ // associate connection with topics
+ e.storage.Store(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@join",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ err = e.pubsub.Subscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ // handle leave
+ case commands.Leave:
+ // remove associated connections from the storage
+ e.storage.Remove(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@leave",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.pubsub.Unsubscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ case commands.Headers:
+
+ default:
+ e.log.Warn("unknown command", "command", msg.Command())
+ }
+ }
+}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
new file mode 100644
index 00000000..a247da69
--- /dev/null
+++ b/plugins/websockets/plugin.go
@@ -0,0 +1,206 @@
+package websockets
+
+import (
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/fasthttp/websocket"
+ "github.com/google/uuid"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/pool"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+const (
+ PluginName string = "websockets"
+)
+
+type Plugin struct {
+ sync.RWMutex
+ // Collection with all available pubsubs
+ pubsubs map[string]pubsub.PubSub
+
+ Config *Config
+ log logger.Logger
+
+ // global connections map
+ connections sync.Map
+ storage *storage.Storage
+
+ workersPool *pool.WorkersPool
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("websockets_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.Config)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.pubsubs = make(map[string]pubsub.PubSub)
+ p.log = log
+ p.storage = storage.NewStorage()
+ p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
+ go func() {
+ ps := p.pubsubs["redis"]
+
+ for {
+ // get message
+ // get topic
+ // get connection uuid from the storage by the topic
+ // write payload into the connection
+ // do that in the workers pool
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
+
+ if data == nil {
+ continue
+ }
+
+ p.workersPool.Queue(data)
+ }
+ }()
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ p.workersPool.Stop()
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.GetPublishers,
+ }
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// GetPublishers collects all pubsubs
+func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) {
+ p.pubsubs[name.Name()] = pub
+}
+
+func (p *Plugin) Middleware(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != p.Config.Path {
+ next.ServeHTTP(w, r)
+ return
+ }
+
+ // connection upgrader
+ upgraded := websocket.Upgrader{
+ HandshakeTimeout: time.Second * 60,
+ ReadBufferSize: 0,
+ WriteBufferSize: 0,
+ WriteBufferPool: nil,
+ Subprotocols: nil,
+ Error: nil,
+ CheckOrigin: nil,
+ EnableCompression: false,
+ }
+
+ // upgrade connection to websocket connection
+ _conn, err := upgraded.Upgrade(w, r, nil)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ safeConn := connection.NewConnection(_conn, p.log)
+ defer func() {
+ err = safeConn.Close()
+ if err != nil {
+ p.log.Error("connection close error", "error", err)
+ }
+ }()
+
+ // generate UUID from the connection
+ connectionID := uuid.NewString()
+ // store connection
+ p.connections.Store(connectionID, safeConn)
+ // when exiting - delete the connection
+ defer func() {
+ p.connections.Delete(connectionID)
+ }()
+
+ // Executor wraps a connection to have a safe abstraction
+ p.Lock()
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"])
+ p.Unlock()
+
+ p.log.Info("websocket client connected", "uuid", connectionID)
+
+ err = e.StartCommandLoop()
+ if err != nil {
+ p.log.Error("command loop error", "error", err.Error())
+ return
+ }
+ })
+}
+
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ if br, ok := p.pubsubs[msg[i].Broker()]; ok {
+ err := br.Publish(msg)
+ if err != nil {
+ return errors.E(err)
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker())
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ err := p.pubsubs[msg[i].Broker()].Publish(msg)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
new file mode 100644
index 00000000..ee31d62f
--- /dev/null
+++ b/plugins/websockets/pool/workers_pool.go
@@ -0,0 +1,104 @@
+package pool
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type WorkersPool struct {
+ storage *storage.Storage
+ connections *sync.Map
+ resPool sync.Pool
+ log logger.Logger
+
+ queue chan pubsub.Message
+ exit chan struct{}
+}
+
+func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
+ wp := &WorkersPool{
+ connections: connections,
+ queue: make(chan pubsub.Message, 100),
+ storage: storage,
+ log: log,
+ exit: make(chan struct{}),
+ }
+
+ wp.resPool.New = func() interface{} {
+ return make(map[string]struct{}, 10)
+ }
+
+ for i := 0; i < 10; i++ {
+ wp.do()
+ }
+
+ return wp
+}
+
+func (wp *WorkersPool) Queue(msg pubsub.Message) {
+ wp.queue <- msg
+}
+
+func (wp *WorkersPool) Stop() {
+ for i := 0; i < 10; i++ {
+ wp.exit <- struct{}{}
+ }
+
+ close(wp.exit)
+}
+
+func (wp *WorkersPool) put(res map[string]struct{}) {
+ // optimized
+ // https://go-review.googlesource.com/c/go/+/110055/
+ // not O(n), but O(1)
+ for k := range res {
+ delete(res, k)
+ }
+}
+
+func (wp *WorkersPool) get() map[string]struct{} {
+ return wp.resPool.Get().(map[string]struct{})
+}
+
+func (wp *WorkersPool) do() {
+ go func() {
+ for {
+ select {
+ case msg := <-wp.queue:
+ res := wp.get()
+ // get connections for the particular topic
+ wp.storage.Get(msg.Topics(), res)
+ if len(res) == 0 {
+ wp.log.Info("no such topic", "topic", msg.Topics())
+ wp.put(res)
+ continue
+ }
+
+ for i := range res {
+ c, ok := wp.connections.Load(i)
+ if !ok {
+ panic("not ok here (((")
+ }
+
+ conn := c.(*connection.Connection)
+ err := conn.Write(websocket.BinaryMessage, msg.Payload())
+ if err != nil {
+ // TODO handle error
+ wp.put(res)
+ continue
+ }
+ }
+
+ wp.put(res)
+ case <-wp.exit:
+ wp.log.Info("get exit signal, exiting from the workers pool")
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
new file mode 100644
index 00000000..2b3ae54e
--- /dev/null
+++ b/plugins/websockets/rpc.go
@@ -0,0 +1,47 @@
+package websockets
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error {
+ const op = errors.Op("broadcast_publish")
+
+ // publish to the registered broker
+ mi := make([]pubsub.Message, 0, len(msg))
+ // golang can't convert slice in-place
+ // so, we need to convert it manually
+ for i := 0; i < len(msg); i++ {
+ mi = append(mi, msg[i])
+ }
+ err := r.plugin.Publish(mi)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+func (r *rpc) PublishAsync(msg []*pubsub.Msg, ok *bool) error {
+ // publish to the registered broker
+ mi := make([]pubsub.Message, 0, len(msg))
+ // golang can't convert slice in-place
+ // so, we need to convert it manually
+ for i := 0; i < len(msg); i++ {
+ mi = append(mi, msg[i])
+ }
+
+ r.plugin.PublishAsync(mi)
+
+ *ok = true
+ return nil
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
new file mode 100644
index 00000000..a7e49207
--- /dev/null
+++ b/plugins/websockets/storage/storage.go
@@ -0,0 +1,50 @@
+package storage
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/plugins/memory/bst"
+)
+
+type Storage struct {
+ sync.RWMutex
+ BST bst.Storage
+}
+
+func NewStorage() *Storage {
+ return &Storage{
+ BST: bst.NewBST(),
+ }
+}
+
+func (s *Storage) Store(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Insert(connID, topics[i])
+ }
+}
+
+func (s *Storage) Remove(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Remove(connID, topics[i])
+ }
+}
+
+func (s *Storage) Get(topics []string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ for i := 0; i < len(topics); i++ {
+ d := s.BST.Get(topics[i])
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+ }
+}
diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go
new file mode 100644
index 00000000..9d9522d4
--- /dev/null
+++ b/plugins/websockets/validator/access_validator.go
@@ -0,0 +1,102 @@
+package validator
+
+import (
+ "bytes"
+ "io"
+ "net/http"
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
+)
+
+type AccessValidator struct {
+ buffer *bytes.Buffer
+ header http.Header
+ status int
+}
+
+func NewValidator() *AccessValidator {
+ return &AccessValidator{
+ buffer: bytes.NewBuffer(nil),
+ header: make(http.Header),
+ }
+}
+
+// Copy all content to parent response writer.
+func (w *AccessValidator) Copy(rw http.ResponseWriter) {
+ rw.WriteHeader(w.status)
+
+ for k, v := range w.header {
+ for _, vv := range v {
+ rw.Header().Add(k, vv)
+ }
+ }
+
+ _, _ = io.Copy(rw, w.buffer)
+}
+
+// Header returns the header map that will be sent by WriteHeader.
+func (w *AccessValidator) Header() http.Header {
+ return w.header
+}
+
+// Write writes the data to the connection as part of an HTTP reply.
+func (w *AccessValidator) Write(p []byte) (int, error) {
+ return w.buffer.Write(p)
+}
+
+// WriteHeader sends an HTTP response header with the provided status code.
+func (w *AccessValidator) WriteHeader(statusCode int) {
+ w.status = statusCode
+}
+
+// IsOK returns true if response contained 200 status code.
+func (w *AccessValidator) IsOK() bool {
+ return w.status == 200
+}
+
+// Body returns response body to rely to user.
+func (w *AccessValidator) Body() []byte {
+ return w.buffer.Bytes()
+}
+
+// Error contains server response.
+func (w *AccessValidator) Error() string {
+ return w.buffer.String()
+}
+
+// AssertServerAccess checks if user can join server and returns error and body if user can not. Must return nil in
+// case of error
+func (w *AccessValidator) AssertServerAccess(f http.HandlerFunc, r *http.Request) error {
+ if err := attributes.Set(r, "ws:joinServer", true); err != nil {
+ return err
+ }
+
+ defer delete(attributes.All(r), "ws:joinServer")
+
+ f(w, r)
+
+ if !w.IsOK() {
+ return w
+ }
+
+ return nil
+}
+
+// AssertTopicsAccess checks if user can access given upstream, the application will receive all user headers and cookies.
+// the decision to authorize user will be based on response code (200).
+func (w *AccessValidator) AssertTopicsAccess(f http.HandlerFunc, r *http.Request, channels ...string) error {
+ if err := attributes.Set(r, "ws:joinTopics", strings.Join(channels, ",")); err != nil {
+ return err
+ }
+
+ defer delete(attributes.All(r), "ws:joinTopics")
+
+ f(w, r)
+
+ if !w.IsOK() {
+ return w
+ }
+
+ return nil
+}
diff --git a/plugins/websockets/validator/access_validator_test.go b/plugins/websockets/validator/access_validator_test.go
new file mode 100644
index 00000000..4a07b00f
--- /dev/null
+++ b/plugins/websockets/validator/access_validator_test.go
@@ -0,0 +1,35 @@
+package validator
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestResponseWrapper_Body(t *testing.T) {
+ w := NewValidator()
+ _, _ = w.Write([]byte("hello"))
+
+ assert.Equal(t, []byte("hello"), w.Body())
+}
+
+func TestResponseWrapper_Header(t *testing.T) {
+ w := NewValidator()
+ w.Header().Set("k", "value")
+
+ assert.Equal(t, "value", w.Header().Get("k"))
+}
+
+func TestResponseWrapper_StatusCode(t *testing.T) {
+ w := NewValidator()
+ w.WriteHeader(200)
+
+ assert.True(t, w.IsOK())
+}
+
+func TestResponseWrapper_StatusCodeBad(t *testing.T) {
+ w := NewValidator()
+ w.WriteHeader(400)
+
+ assert.False(t, w.IsOK())
+}
diff --git a/tests/Dockerfile b/tests/Dockerfile
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/Dockerfile
diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml
index 67d5476b..a8e8a7c5 100644
--- a/tests/docker-compose.yaml
+++ b/tests/docker-compose.yaml
@@ -7,5 +7,8 @@ services:
- "0.0.0.0:11211:11211"
redis:
image: redis:6
+ mem_limit: 16384m
+ mem_reservation: 2048M
+ cpus: 8
ports:
- "6379:6379"
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 575fe656..1fa29783 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -12,8 +12,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/stretchr/testify/assert"
"net/http"
diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go
index 15c82839..32738ae0 100644
--- a/tests/plugins/http/parse_test.go
+++ b/tests/plugins/http/parse_test.go
@@ -3,7 +3,7 @@ package http
import (
"testing"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
+ "github.com/spiral/roadrunner/v2/pkg/worker_handler"
)
var samples = []struct {
diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go
index 3564d9cd..5b72df40 100644
--- a/tests/plugins/http/response_test.go
+++ b/tests/plugins/http/response_test.go
@@ -7,7 +7,7 @@ import (
"testing"
"github.com/spiral/roadrunner/v2/pkg/payload"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
+ "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/stretchr/testify/assert"
)
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index 5c39589c..82843d4e 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -18,8 +18,8 @@ import (
j "github.com/json-iterator/go"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/stretchr/testify/assert"
)
diff --git a/tests/plugins/informer/.rr-informer.yaml b/tests/plugins/informer/.rr-informer.yaml
index e1edbb44..94c9a856 100644
--- a/tests/plugins/informer/.rr-informer.yaml
+++ b/tests/plugins/informer/.rr-informer.yaml
@@ -3,8 +3,8 @@ server:
user: ""
group: ""
env:
- "RR_CONFIG": "/some/place/on/the/C134"
- "RR_CONFIG2": "C138"
+ - RR_CONFIG: "/some/place/on/the/C134"
+ - RR_CONFIG: "C138"
relay: "pipes"
relay_timeout: "20s"
@@ -12,4 +12,4 @@ rpc:
listen: tcp://127.0.0.1:6001
logs:
mode: development
- level: error \ No newline at end of file
+ level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
new file mode 100644
index 00000000..9973b2dc
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
@@ -0,0 +1,50 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:15395
+ max_request_size: 1024
+ middleware: ["websockets"]
+ uploads:
+ forbid: [ ".php", ".exe", ".bat" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # also, they should implement RPC methods to publish data into them
+ # pubsubs might use general config section or its own
+
+ pubsubs: [ "redis" ]
+
+ # sample of the own config section for the redis pubsub driver
+
+ redis:
+ addrs:
+ - localhost:1111
+ # path used as websockets path
+ path: "/ws"
+logs:
+ mode: development
+ level: debug
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
new file mode 100644
index 00000000..a9b90fd0
--- /dev/null
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -0,0 +1,102 @@
+package websockets
+
+import (
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/websockets"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBroadcastInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.InfoLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1000)
+ t.Run("test1", test1)
+ t.Run("test2", test2)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func test1(t *testing.T) {
+
+}
+
+func test2(t *testing.T) {
+
+}
diff --git a/utils/network.go b/utils/network.go
index b73363db..86a7e733 100755
--- a/utils/network.go
+++ b/utils/network.go
@@ -12,7 +12,8 @@ import (
"github.com/valyala/tcplisten"
)
-// - SO_REUSEPORT. This option allows linear scaling server performance
+// CreateListener
+// - SO_REUSEPORT. This option allows linear scaling server performance
// on multi-CPU servers.
// See https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ for details.
//