summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linters.yml2
-rwxr-xr-x.gitignore1
-rwxr-xr-x.golangci.yml9
-rw-r--r--go.mod9
-rw-r--r--pkg/bst/bst.go136
-rw-r--r--pkg/bst/bst_test.go37
-rw-r--r--pkg/bst/doc.go7
-rw-r--r--pkg/bst/interface.go11
-rw-r--r--pkg/pool/interface.go2
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/pubsub/interface.go38
-rw-r--r--pkg/pubsub/message.go42
-rw-r--r--pkg/worker_handler/constants.go (renamed from plugins/http/worker_handler/constants.go)0
-rw-r--r--pkg/worker_handler/errors.go (renamed from plugins/http/worker_handler/errors.go)0
-rw-r--r--pkg/worker_handler/errors_windows.go (renamed from plugins/http/worker_handler/errors_windows.go)0
-rw-r--r--pkg/worker_handler/handler.go (renamed from plugins/http/worker_handler/handler.go)2
-rw-r--r--pkg/worker_handler/parse.go (renamed from plugins/http/worker_handler/parse.go)0
-rw-r--r--pkg/worker_handler/request.go (renamed from plugins/http/worker_handler/request.go)0
-rw-r--r--pkg/worker_handler/response.go (renamed from plugins/http/worker_handler/response.go)0
-rw-r--r--pkg/worker_handler/uploads.go (renamed from plugins/http/worker_handler/uploads.go)0
-rw-r--r--pkg/worker_watcher/interface.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go6
-rw-r--r--plugins/config/interface.go4
-rw-r--r--plugins/gzip/plugin.go5
-rw-r--r--plugins/http/plugin.go226
-rw-r--r--plugins/http/serve.go76
-rw-r--r--plugins/kv/drivers/boltdb/driver.go3
-rw-r--r--plugins/kv/drivers/memcached/driver.go3
-rw-r--r--plugins/kv/drivers/memory/driver.go3
-rw-r--r--plugins/kv/drivers/redis/plugin.go2
-rw-r--r--plugins/kv/storage.go4
-rw-r--r--plugins/logger/std_log_adapter.go9
-rw-r--r--plugins/memory/plugin.go76
-rw-r--r--plugins/redis/fanin.go100
-rw-r--r--plugins/redis/plugin.go134
-rw-r--r--plugins/server/plugin.go4
-rw-r--r--plugins/websockets/commands/enums.go9
-rw-r--r--plugins/websockets/config.go67
-rw-r--r--plugins/websockets/connection/connection.go67
-rw-r--r--plugins/websockets/doc/broadcast.drawio1
-rw-r--r--plugins/websockets/doc/broadcast_arch.drawio1
-rw-r--r--plugins/websockets/doc/ws.drawio1
-rw-r--r--plugins/websockets/executor/executor.go121
-rw-r--r--plugins/websockets/plugin.go203
-rw-r--r--plugins/websockets/pool/workers_pool.go104
-rw-r--r--plugins/websockets/rpc.go47
-rw-r--r--plugins/websockets/storage/storage.go50
-rw-r--r--plugins/websockets/validator/access_validator.go102
-rw-r--r--plugins/websockets/validator/access_validator_test.go35
-rw-r--r--tests/Dockerfile0
-rw-r--r--tests/docker-compose.yaml3
-rw-r--r--tests/plugins/http/handler_test.go2
-rw-r--r--tests/plugins/http/parse_test.go2
-rw-r--r--tests/plugins/http/response_test.go2
-rw-r--r--tests/plugins/http/uploads_test.go2
-rw-r--r--tests/plugins/informer/.rr-informer.yaml6
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml50
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go102
-rwxr-xr-xutils/network.go3
60 files changed, 1714 insertions, 227 deletions
diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml
index 82072675..24d839e5 100644
--- a/.github/workflows/linters.yml
+++ b/.github/workflows/linters.yml
@@ -13,6 +13,6 @@ jobs:
- name: Run linter
uses: golangci/golangci-lint-action@v2 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
- version: v1.39 # without patch version
+ version: v1.41 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m
diff --git a/.gitignore b/.gitignore
index 9a9a07b6..beb3f885 100755
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ tests/vendor/
.rr-sample.yaml
cmd
rr
+**/old
diff --git a/.golangci.yml b/.golangci.yml
index bfc69f57..41e1d68f 100755
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -14,8 +14,10 @@ output:
linters-settings:
govet:
check-shadowing: true
- golint:
- min-confidence: 0.1
+ revive:
+ confidence: 0.8
+ errorCode: 0
+ warningCode: 0
gocyclo:
min-complexity: 15
godot:
@@ -55,7 +57,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- gocritic # The most opinionated Go source code linter
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
- - golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
+ - revive # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
- goprintffuncname # Checks that printf-like functions are named with `f` at the end
- gosec # Inspects source code for security problems
- gosimple # Linter for Go source code that specializes in simplifying a code
@@ -88,3 +90,4 @@ issues:
- goconst
- noctx
- gosimple
+ - revive
diff --git a/go.mod b/go.mod
index 72191467..19a9156a 100644
--- a/go.mod
+++ b/go.mod
@@ -8,16 +8,20 @@ require (
github.com/alicebob/miniredis/v2 v2.14.5
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/dustin/go-humanize v1.0.0
+ github.com/fasthttp/websocket v1.4.3
github.com/fatih/color v1.12.0
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-redis/redis/v8 v8.9.0
github.com/gofiber/fiber/v2 v2.10.0
github.com/golang/mock v1.4.4
github.com/google/flatbuffers v1.12.1
+ github.com/google/uuid v1.2.0
github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.11
+ github.com/klauspost/compress v1.12.2 // indirect
github.com/olekukonko/tablewriter v0.0.5
github.com/prometheus/client_golang v1.10.0
+ github.com/savsgio/gotils v0.0.0-20210316171653-c54912823645 // indirect
github.com/shirou/gopsutil v3.21.3+incompatible
github.com/spf13/viper v1.7.1
// SPIRAL ====
@@ -27,12 +31,13 @@ require (
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.4 // indirect
- github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
+ github.com/valyala/fasthttp v1.24.0 // indirect
+ github.com/valyala/tcplisten v1.0.0
github.com/yookoala/gofast v0.6.0
go.etcd.io/bbolt v1.3.5
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.17.0
golang.org/x/net v0.0.0-20210226101413-39120d07d75e
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
- golang.org/x/sys v0.0.0-20210309074719-68d13333faf2
+ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015
)
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go
new file mode 100644
index 00000000..8477ceee
--- /dev/null
+++ b/pkg/bst/bst.go
@@ -0,0 +1,136 @@
+package bst
+
+// BST ...
+type BST struct {
+ // registered topic, not unique
+ topic string
+ // associated connections with the topic
+ uuids map[string]struct{}
+
+ // left and right subtrees
+ left *BST
+ right *BST
+}
+
+func NewBST() Storage {
+ return &BST{}
+}
+
+// Insert uuid to the topic
+func (b *BST) Insert(uuid string, topic string) {
+ curr := b
+
+ for {
+ if curr.topic == topic {
+ curr.uuids[uuid] = struct{}{}
+ return
+ }
+ // if topic less than curr topic
+ if curr.topic < topic {
+ if curr.left == nil {
+ curr.left = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+ // move forward
+ curr = curr.left
+ } else {
+ if curr.right == nil {
+ curr.right = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+
+ curr = curr.right
+ }
+ }
+}
+
+func (b *BST) Get(topic string) map[string]struct{} {
+ curr := b
+ for curr != nil {
+ if curr.topic == topic {
+ return curr.uuids
+ }
+ if curr.topic < topic {
+ curr = curr.left
+ continue
+ }
+ if curr.topic > topic {
+ curr = curr.right
+ continue
+ }
+ }
+
+ return nil
+}
+
+func (b *BST) Remove(uuid string, topic string) {
+ b.removeHelper(uuid, topic, nil)
+}
+
+func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
+ curr := b
+ for curr != nil {
+ if topic < curr.topic { //nolint:gocritic
+ parent = curr
+ curr = curr.left
+ } else if topic > curr.topic {
+ parent = curr
+ curr = curr.right
+ } else {
+ if len(curr.uuids) > 1 {
+ if _, ok := curr.uuids[uuid]; ok {
+ delete(curr.uuids, uuid)
+ return
+ }
+ }
+
+ if curr.left != nil && curr.right != nil { //nolint:gocritic
+ curr.topic, curr.uuids = curr.right.traverseForMinString()
+ curr.right.removeHelper(curr.topic, uuid, curr)
+ } else if parent == nil {
+ if curr.left != nil { //nolint:gocritic
+ curr.topic = curr.left.topic
+ curr.uuids = curr.left.uuids
+
+ curr.right = curr.left.right
+ curr.left = curr.left.left
+ } else if curr.right != nil {
+ curr.topic = curr.right.topic
+ curr.uuids = curr.right.uuids
+
+ curr.left = curr.right.left
+ curr.right = curr.right.right
+ } else { //nolint:staticcheck
+ // single node tree
+ }
+ } else if parent.left == curr {
+ if curr.left != nil {
+ parent.left = curr.left
+ } else {
+ parent.left = curr.right
+ }
+ } else if parent.right == curr {
+ if curr.left != nil {
+ parent.right = curr.left
+ } else {
+ parent.right = curr.right
+ }
+ }
+ break
+ }
+ }
+}
+
+//go:inline
+func (b *BST) traverseForMinString() (string, map[string]struct{}) {
+ if b.left == nil {
+ return b.topic, b.uuids
+ }
+ return b.left.traverseForMinString()
+}
diff --git a/pkg/bst/bst_test.go b/pkg/bst/bst_test.go
new file mode 100644
index 00000000..e8a13760
--- /dev/null
+++ b/pkg/bst/bst_test.go
@@ -0,0 +1,37 @@
+package bst
+
+import (
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewBST(t *testing.T) {
+ // create a new bst
+ g := NewBST()
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments2")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments3")
+ }
+
+ // should be 100
+ exist := g.Get("comments")
+ assert.Len(t, exist, 100)
+
+ // should be 100
+ exist2 := g.Get("comments2")
+ assert.Len(t, exist2, 100)
+
+ // should be 100
+ exist3 := g.Get("comments3")
+ assert.Len(t, exist3, 100)
+}
diff --git a/pkg/bst/doc.go b/pkg/bst/doc.go
new file mode 100644
index 00000000..abb7e6e9
--- /dev/null
+++ b/pkg/bst/doc.go
@@ -0,0 +1,7 @@
+package bst
+
+/*
+Binary search tree for the pubsub
+
+The vertex may have one or multiply topics associated with the single websocket connection UUID
+*/
diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go
new file mode 100644
index 00000000..ecf40414
--- /dev/null
+++ b/pkg/bst/interface.go
@@ -0,0 +1,11 @@
+package bst
+
+// Storage is general in-memory BST storage implementation
+type Storage interface {
+ // Insert inserts to a vertex with topic ident connection uuid
+ Insert(uuid string, topic string)
+ // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
+ Remove(uuid, topic string)
+ // Get will return all connections associated with the topic
+ Get(topic string) map[string]struct{}
+}
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index 4ef2f2e7..c22fbbd3 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -18,7 +18,7 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
- // Remove worker from the pool.
+ // RemoveWorker removes worker from the pool.
RemoveWorker(worker worker.BaseProcess) error
// Destroy all underlying stack (but let them to complete the task).
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index d57cc95c..8c9d69b9 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -47,7 +47,7 @@ type StaticPool struct {
allocator worker.Allocator
// err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint:golint,stylecheck
+ err_encoder ErrorEncoder //nolint:stylecheck
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 40903db3..ca61dbc4 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -15,7 +15,7 @@ import (
const MB = 1024 * 1024
// NSEC_IN_SEC nanoseconds in second
-const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
type Supervised interface {
Pool
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
new file mode 100644
index 00000000..80dab0c3
--- /dev/null
+++ b/pkg/pubsub/interface.go
@@ -0,0 +1,38 @@
+package pubsub
+
+// PubSub ...
+type PubSub interface {
+ Publisher
+ Subscriber
+ Reader
+}
+
+// Subscriber defines the ability to operate as message passing broker.
+type Subscriber interface {
+ // Subscribe broker to one or multiple topics.
+ Subscribe(topics ...string) error
+ // Unsubscribe from one or multiply topics
+ Unsubscribe(topics ...string) error
+}
+
+// Publisher publish one or more messages
+type Publisher interface {
+ // Publish one or multiple Channel.
+ Publish(messages []Message) error
+
+ // PublishAsync publish message and return immediately
+ // If error occurred it will be printed into the logger
+ PublishAsync(messages []Message)
+}
+
+// Reader interface should return next message
+type Reader interface {
+ Next() (Message, error)
+}
+
+type Message interface {
+ Command() string
+ Payload() []byte
+ Topics() []string
+ Broker() string
+}
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
new file mode 100644
index 00000000..2536aece
--- /dev/null
+++ b/pkg/pubsub/message.go
@@ -0,0 +1,42 @@
+package pubsub
+
+import (
+ json "github.com/json-iterator/go"
+)
+
+type Msg struct {
+ // Topic message been pushed into.
+ Topics_ []string `json:"topic"`
+
+ // Command (join, leave, headers)
+ Command_ string `json:"command"`
+
+ // Broker (redis, memory)
+ Broker_ string `json:"broker"`
+
+ // Payload to be broadcasted
+ Payload_ []byte `json:"payload"`
+}
+
+func (m *Msg) MarshalBinary() ([]byte, error) {
+ return json.Marshal(m)
+}
+
+// Payload in raw bytes
+func (m *Msg) Payload() []byte {
+ return m.Payload_
+}
+
+// Command for the connection
+func (m *Msg) Command() string {
+ return m.Command_
+}
+
+// Topics to subscribe
+func (m *Msg) Topics() []string {
+ return m.Topics_
+}
+
+func (m *Msg) Broker() string {
+ return m.Broker_
+}
diff --git a/plugins/http/worker_handler/constants.go b/pkg/worker_handler/constants.go
index 3355d9c2..3355d9c2 100644
--- a/plugins/http/worker_handler/constants.go
+++ b/pkg/worker_handler/constants.go
diff --git a/plugins/http/worker_handler/errors.go b/pkg/worker_handler/errors.go
index 5fa8e64e..5fa8e64e 100644
--- a/plugins/http/worker_handler/errors.go
+++ b/pkg/worker_handler/errors.go
diff --git a/plugins/http/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
index 390cc7d1..390cc7d1 100644
--- a/plugins/http/worker_handler/errors_windows.go
+++ b/pkg/worker_handler/errors_windows.go
diff --git a/plugins/http/worker_handler/handler.go b/pkg/worker_handler/handler.go
index be53fc12..d98cdef0 100644
--- a/plugins/http/worker_handler/handler.go
+++ b/pkg/worker_handler/handler.go
@@ -89,7 +89,7 @@ func (h *Handler) AddListener(l events.Listener) {
// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serve_http")
start := time.Now()
// validating request size
diff --git a/plugins/http/worker_handler/parse.go b/pkg/worker_handler/parse.go
index 2790da2a..2790da2a 100644
--- a/plugins/http/worker_handler/parse.go
+++ b/pkg/worker_handler/parse.go
diff --git a/plugins/http/worker_handler/request.go b/pkg/worker_handler/request.go
index 178bc827..178bc827 100644
--- a/plugins/http/worker_handler/request.go
+++ b/pkg/worker_handler/request.go
diff --git a/plugins/http/worker_handler/response.go b/pkg/worker_handler/response.go
index 1763d304..1763d304 100644
--- a/plugins/http/worker_handler/response.go
+++ b/pkg/worker_handler/response.go
diff --git a/plugins/http/worker_handler/uploads.go b/pkg/worker_handler/uploads.go
index e695000e..e695000e 100644
--- a/plugins/http/worker_handler/uploads.go
+++ b/pkg/worker_handler/uploads.go
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 4625b7a7..29fa3640 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -23,9 +23,9 @@ type Watcher interface {
// Destroy destroys the underlying container
Destroy(ctx context.Context)
- // WorkersList return all container w/o removing it from internal storage
+ // List return all container w/o removing it from internal storage
List() []worker.BaseProcess
- // RemoveWorker remove worker from the container
+ // Remove will remove worker from the container
Remove(wb worker.BaseProcess)
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 5aec4ee6..557563ac 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -11,7 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead container will not be replaced
+// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
@@ -215,7 +215,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
}
-// Warning, this is O(n) operation, and it will return copy of the actual workers
+// List - this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()
diff --git a/plugins/config/interface.go b/plugins/config/interface.go
index 59ad981f..b3854e09 100644
--- a/plugins/config/interface.go
+++ b/plugins/config/interface.go
@@ -11,7 +11,7 @@ type Configurer interface {
// }
UnmarshalKey(name string, out interface{}) error
- // Unmarshal unmarshals the config into a Struct. Make sure that the tags
+ // Unmarshal unmarshal the config into a Struct. Make sure that the tags
// on the fields of the structure are properly set.
Unmarshal(out interface{}) error
@@ -24,6 +24,6 @@ type Configurer interface {
// Has checks if config section exists.
Has(name string) bool
- // Returns General section. Read-only
+ // GetCommonConfig returns General section. Read-only
GetCommonConfig() *General
}
diff --git a/plugins/gzip/plugin.go b/plugins/gzip/plugin.go
index 24b125fb..133704f8 100644
--- a/plugins/gzip/plugin.go
+++ b/plugins/gzip/plugin.go
@@ -4,6 +4,7 @@ import (
"net/http"
"github.com/NYTimes/gziphandler"
+ "github.com/gofiber/fiber/v2"
)
const PluginName = "gzip"
@@ -21,6 +22,10 @@ func (g *Plugin) Middleware(next http.Handler) http.Handler {
})
}
+func (g *Plugin) FiberMiddleware(ctx fiber.Ctx) {
+
+}
+
// Available interface implementation
func (g *Plugin) Available() {}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 2b68bbe5..8bcffb63 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -13,10 +13,10 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
httpConfig "github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/status"
@@ -71,47 +71,47 @@ type Plugin struct {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error {
const op = errors.Op("http_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, err)
}
- err = s.cfg.InitDefaults()
+ err = p.cfg.InitDefaults()
if err != nil {
return errors.E(op, err)
}
// rr logger (via plugin)
- s.log = rrLogger
+ p.log = rrLogger
// use time and date in UTC format
- s.stdLog = log.New(logger.NewStdAdapter(s.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC)
+ p.stdLog = log.New(logger.NewStdAdapter(p.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC)
- s.mdwr = make(map[string]Middleware)
+ p.mdwr = make(map[string]Middleware)
- if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() {
+ if !p.cfg.EnableHTTP() && !p.cfg.EnableTLS() && !p.cfg.EnableFCGI() {
return errors.E(op, errors.Disabled)
}
// init if nil
- if s.cfg.Env == nil {
- s.cfg.Env = make(map[string]string)
+ if p.cfg.Env == nil {
+ p.cfg.Env = make(map[string]string)
}
- s.cfg.Env[RrMode] = "http"
- s.server = server
+ p.cfg.Env[RrMode] = "http"
+ p.server = server
return nil
}
-func (s *Plugin) logCallback(event interface{}) {
+func (p *Plugin) logCallback(event interface{}) {
if ev, ok := event.(handler.ResponseEvent); ok {
- s.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI),
+ p.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI),
"remote", ev.Request.RemoteAddr,
"elapsed", ev.Elapsed().String(),
)
@@ -119,60 +119,60 @@ func (s *Plugin) logCallback(event interface{}) {
}
// Serve serves the svc.
-func (s *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 2)
// run whole process in the goroutine
go func() {
// protect http initialization
- s.Lock()
- s.serve(errCh)
- s.Unlock()
+ p.Lock()
+ p.serve(errCh)
+ p.Unlock()
}()
return errCh
}
-func (s *Plugin) serve(errCh chan error) {
+func (p *Plugin) serve(errCh chan error) {
var err error
const op = errors.Op("http_plugin_serve")
- s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
+ p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, p.cfg.Env, p.logCallback)
if err != nil {
errCh <- errors.E(op, err)
return
}
- s.handler, err = handler.NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.Cidrs,
- s.pool,
+ p.handler, err = handler.NewHandler(
+ p.cfg.MaxRequestSize,
+ *p.cfg.Uploads,
+ p.cfg.Cidrs,
+ p.pool,
)
if err != nil {
errCh <- errors.E(op, err)
return
}
- s.handler.AddListener(s.logCallback)
+ p.handler.AddListener(p.logCallback)
- if s.cfg.EnableHTTP() {
- if s.cfg.EnableH2C() {
- s.http = &http.Server{Handler: h2c.NewHandler(s, &http2.Server{}), ErrorLog: s.stdLog}
+ if p.cfg.EnableHTTP() {
+ if p.cfg.EnableH2C() {
+ p.http = &http.Server{Handler: h2c.NewHandler(p, &http2.Server{}), ErrorLog: p.stdLog}
} else {
- s.http = &http.Server{Handler: s, ErrorLog: s.stdLog}
+ p.http = &http.Server{Handler: p, ErrorLog: p.stdLog}
}
}
- if s.cfg.EnableTLS() {
- s.https = s.initSSL()
- if s.cfg.SSLConfig.RootCA != "" {
- err = s.appendRootCa()
+ if p.cfg.EnableTLS() {
+ p.https = p.initSSL()
+ if p.cfg.SSLConfig.RootCA != "" {
+ err = p.appendRootCa()
if err != nil {
errCh <- errors.E(op, err)
return
@@ -180,102 +180,102 @@ func (s *Plugin) serve(errCh chan error) {
}
// if HTTP2Config not nil
- if s.cfg.HTTP2Config != nil {
- if err := s.initHTTP2(); err != nil {
+ if p.cfg.HTTP2Config != nil {
+ if err := p.initHTTP2(); err != nil {
errCh <- errors.E(op, err)
return
}
}
}
- if s.cfg.EnableFCGI() {
- s.fcgi = &http.Server{Handler: s, ErrorLog: s.stdLog}
+ if p.cfg.EnableFCGI() {
+ p.fcgi = &http.Server{Handler: p, ErrorLog: p.stdLog}
}
// start http, https and fcgi servers if requested in the config
go func() {
- s.serveHTTP(errCh)
+ p.serveHTTP(errCh)
}()
go func() {
- s.serveHTTPS(errCh)
+ p.serveHTTPS(errCh)
}()
go func() {
- s.serveFCGI(errCh)
+ p.serveFCGI(errCh)
}()
}
// Stop stops the http.
-func (s *Plugin) Stop() error {
- s.Lock()
- defer s.Unlock()
+func (p *Plugin) Stop() error {
+ p.Lock()
+ defer p.Unlock()
var err error
- if s.fcgi != nil {
- err = s.fcgi.Shutdown(context.Background())
+ if p.fcgi != nil {
+ err = p.fcgi.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the fcgi server", "error", err)
+ p.log.Error("error shutting down the fcgi server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
- if s.https != nil {
- err = s.https.Shutdown(context.Background())
+ if p.https != nil {
+ err = p.https.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the https server", "error", err)
+ p.log.Error("error shutting down the https server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
- if s.http != nil {
- err = s.http.Shutdown(context.Background())
+ if p.http != nil {
+ err = p.http.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the http server", "error", err)
+ p.log.Error("error shutting down the http server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
// check for safety
- if s.pool != nil {
- s.pool.Destroy(context.Background())
+ if p.pool != nil {
+ p.pool.Destroy(context.Background())
}
return err
}
// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
-func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if headerContainsUpgrade(r) {
http.Error(w, "server does not support upgrade header", http.StatusInternalServerError)
return
}
- if s.https != nil && r.TLS == nil && s.cfg.SSLConfig.Redirect {
- s.redirect(w, r)
+ if p.https != nil && r.TLS == nil && p.cfg.SSLConfig.Redirect {
+ p.redirect(w, r)
return
}
- if s.https != nil && r.TLS != nil {
+ if p.https != nil && r.TLS != nil {
w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
}
r = attributes.Init(r)
// protect the case, when user sendEvent Reset and we are replacing handler with pool
- s.RLock()
- s.handler.ServeHTTP(w, r)
- s.RUnlock()
+ p.RLock()
+ p.handler.ServeHTTP(w, r)
+ p.RUnlock()
}
// Workers returns slice with the process states for the workers
-func (s *Plugin) Workers() []process.State {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Workers() []process.State {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
ps := make([]process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
@@ -290,74 +290,74 @@ func (s *Plugin) Workers() []process.State {
}
// internal
-func (s *Plugin) workers() []worker.BaseProcess {
- return s.pool.Workers()
+func (p *Plugin) workers() []worker.BaseProcess {
+ return p.pool.Workers()
}
// Name returns endure.Named interface implementation
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Reset destroys the old pool and replaces it with new one, waiting for old pool to die
-func (s *Plugin) Reset() error {
- s.Lock()
- defer s.Unlock()
+func (p *Plugin) Reset() error {
+ p.Lock()
+ defer p.Unlock()
const op = errors.Op("http_plugin_reset")
- s.log.Info("HTTP plugin got restart request. Restarting...")
- s.pool.Destroy(context.Background())
- s.pool = nil
+ p.log.Info("HTTP plugin got restart request. Restarting...")
+ p.pool.Destroy(context.Background())
+ p.pool = nil
var err error
- s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
+ p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, p.cfg.Env, p.logCallback)
if err != nil {
return errors.E(op, err)
}
- s.log.Info("HTTP workers Pool successfully restarted")
+ p.log.Info("HTTP workers Pool successfully restarted")
- s.handler, err = handler.NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.Cidrs,
- s.pool,
+ p.handler, err = handler.NewHandler(
+ p.cfg.MaxRequestSize,
+ *p.cfg.Uploads,
+ p.cfg.Cidrs,
+ p.pool,
)
if err != nil {
return errors.E(op, err)
}
- s.log.Info("HTTP handler listeners successfully re-added")
- s.handler.AddListener(s.logCallback)
+ p.log.Info("HTTP handler listeners successfully re-added")
+ p.handler.AddListener(p.logCallback)
- s.log.Info("HTTP plugin successfully restarted")
+ p.log.Info("HTTP plugin successfully restarted")
return nil
}
// Collects collecting http middlewares
-func (s *Plugin) Collects() []interface{} {
+func (p *Plugin) Collects() []interface{} {
return []interface{}{
- s.AddMiddleware,
+ p.AddMiddleware,
}
}
// AddMiddleware is base requirement for the middleware (name and Middleware)
-func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
- s.mdwr[name.Name()] = m
+func (p *Plugin) AddMiddleware(name endure.Named, m Middleware) {
+ p.mdwr[name.Name()] = m
}
// Status return status of the particular plugin
-func (s *Plugin) Status() status.Status {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Status() status.Status {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
for i := 0; i < len(workers); i++ {
if workers[i].State().IsActive() {
return status.Status{
@@ -372,14 +372,14 @@ func (s *Plugin) Status() status.Status {
}
// Ready return readiness status of the particular plugin
-func (s *Plugin) Ready() status.Status {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Ready() status.Status {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
- // we assume, that plugin's worker pool is ready
+ // we assume, that plugin'p worker pool is ready
if workers[i].State().Value() == worker.StateReady {
return status.Status{
Code: http.StatusOK,
@@ -393,4 +393,4 @@ func (s *Plugin) Ready() status.Status {
}
// Available interface implementation
-func (s *Plugin) Available() {}
+func (p *Plugin) Available() {}
diff --git a/plugins/http/serve.go b/plugins/http/serve.go
index 78796322..bf1ccafe 100644
--- a/plugins/http/serve.go
+++ b/plugins/http/serve.go
@@ -17,46 +17,46 @@ import (
"golang.org/x/sys/cpu"
)
-func (s *Plugin) serveHTTP(errCh chan error) {
- if s.http == nil {
+func (p *Plugin) serveHTTP(errCh chan error) {
+ if p.http == nil {
return
}
- const op = errors.Op("http_plugin_serve_http")
+ const op = errors.Op("serveHTTP")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log)
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.http, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.Address)
+ l, err := utils.CreateListener(p.cfg.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = s.http.Serve(l)
+ err = p.http.Serve(l)
if err != nil && err != http.ErrServerClosed {
errCh <- errors.E(op, err)
return
}
}
-func (s *Plugin) serveHTTPS(errCh chan error) {
- if s.https == nil {
+func (p *Plugin) serveHTTPS(errCh chan error) {
+ if p.https == nil {
return
}
- const op = errors.Op("http_plugin_serve_https")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
+ const op = errors.Op("serveHTTPS")
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.SSLConfig.Address)
+ l, err := utils.CreateListener(p.cfg.SSLConfig.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = s.https.ServeTLS(
+ err = p.https.ServeTLS(
l,
- s.cfg.SSLConfig.Cert,
- s.cfg.SSLConfig.Key,
+ p.cfg.SSLConfig.Cert,
+ p.cfg.SSLConfig.Key,
)
if err != nil && err != http.ErrServerClosed {
@@ -66,34 +66,34 @@ func (s *Plugin) serveHTTPS(errCh chan error) {
}
// serveFCGI starts FastCGI server.
-func (s *Plugin) serveFCGI(errCh chan error) {
- if s.fcgi == nil {
+func (p *Plugin) serveFCGI(errCh chan error) {
+ if p.fcgi == nil {
return
}
- const op = errors.Op("http_plugin_serve_fcgi")
+ const op = errors.Op("serveFCGI")
- if len(s.mdwr) > 0 {
- applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
+ if len(p.mdwr) > 0 {
+ applyMiddlewares(p.https, p.mdwr, p.cfg.Middleware, p.log)
}
- l, err := utils.CreateListener(s.cfg.FCGIConfig.Address)
+ l, err := utils.CreateListener(p.cfg.FCGIConfig.Address)
if err != nil {
errCh <- errors.E(op, err)
return
}
- err = fcgi.Serve(l, s.fcgi.Handler)
+ err = fcgi.Serve(l, p.fcgi.Handler)
if err != nil && err != http.ErrServerClosed {
errCh <- errors.E(op, err)
return
}
}
-func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) {
+func (p *Plugin) redirect(w http.ResponseWriter, r *http.Request) {
target := &url.URL{
Scheme: HTTPSScheme,
// host or host:port
- Host: s.tlsAddr(r.Host, false),
+ Host: p.tlsAddr(r.Host, false),
Path: r.URL.Path,
RawQuery: r.URL.RawQuery,
}
@@ -111,7 +111,7 @@ func headerContainsUpgrade(r *http.Request) bool {
}
// append RootCA to the https server TLS config
-func (s *Plugin) appendRootCa() error {
+func (p *Plugin) appendRootCa() error {
const op = errors.Op("http_plugin_append_root_ca")
rootCAs, err := x509.SystemCertPool()
if err != nil {
@@ -121,7 +121,7 @@ func (s *Plugin) appendRootCa() error {
rootCAs = x509.NewCertPool()
}
- CA, err := os.ReadFile(s.cfg.SSLConfig.RootCA)
+ CA, err := os.ReadFile(p.cfg.SSLConfig.RootCA)
if err != nil {
return err
}
@@ -137,13 +137,13 @@ func (s *Plugin) appendRootCa() error {
InsecureSkipVerify: false,
RootCAs: rootCAs,
}
- s.http.TLSConfig = cfg
+ p.http.TLSConfig = cfg
return nil
}
// Init https server
-func (s *Plugin) initSSL() *http.Server {
+func (p *Plugin) initSSL() *http.Server {
var topCipherSuites []uint16
var defaultCipherSuitesTLS13 []uint16
@@ -193,9 +193,9 @@ func (s *Plugin) initSSL() *http.Server {
DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
sslServer := &http.Server{
- Addr: s.tlsAddr(s.cfg.Address, true),
- Handler: s,
- ErrorLog: s.stdLog,
+ Addr: p.tlsAddr(p.cfg.Address, true),
+ Handler: p,
+ ErrorLog: p.stdLog,
TLSConfig: &tls.Config{
CurvePreferences: []tls.CurveID{
tls.CurveP256,
@@ -213,19 +213,19 @@ func (s *Plugin) initSSL() *http.Server {
}
// init http/2 server
-func (s *Plugin) initHTTP2() error {
- return http2.ConfigureServer(s.https, &http2.Server{
- MaxConcurrentStreams: s.cfg.HTTP2Config.MaxConcurrentStreams,
+func (p *Plugin) initHTTP2() error {
+ return http2.ConfigureServer(p.https, &http2.Server{
+ MaxConcurrentStreams: p.cfg.HTTP2Config.MaxConcurrentStreams,
})
}
// tlsAddr replaces listen or host port with port configured by SSLConfig config.
-func (s *Plugin) tlsAddr(host string, forcePort bool) string {
+func (p *Plugin) tlsAddr(host string, forcePort bool) string {
// remove current forcePort first
host = strings.Split(host, ":")[0]
- if forcePort || s.cfg.SSLConfig.Port != 443 {
- host = fmt.Sprintf("%s:%v", host, s.cfg.SSLConfig.Port)
+ if forcePort || p.cfg.SSLConfig.Port != 443 {
+ host = fmt.Sprintf("%s:%v", host, p.cfg.SSLConfig.Port)
}
return host
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 2e2df527..0f647cb1 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
)
@@ -393,7 +394,7 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
- err := b.Delete([]byte(k))
+ err := b.Delete(utils.AsBytes(k))
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 17b06fa0..02281ed5 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -148,7 +149,7 @@ func (d *Driver) Set(items ...kv.Item) error {
memcachedItem := &memcache.Item{
Key: items[i].Key,
// unsafe convert
- Value: []byte(items[i].Value),
+ Value: utils.AsBytes(items[i].Value),
Flags: 0,
}
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index 1e0d03d4..c2494ee7 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -9,6 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -70,7 +71,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return []byte(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(kv.Item).Value), nil
}
return nil, nil
}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
index d2183411..3694c5a7 100644
--- a/plugins/kv/drivers/redis/plugin.go
+++ b/plugins/kv/drivers/redis/plugin.go
@@ -28,7 +28,7 @@ func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
// Serve is noop here
func (s *Plugin) Serve() chan error {
- return make(chan error, 1)
+ return make(chan error)
}
func (s *Plugin) Stop() error {
diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go
index fe2fa10b..9a609735 100644
--- a/plugins/kv/storage.go
+++ b/plugins/kv/storage.go
@@ -90,9 +90,9 @@ func (p *Plugin) Serve() chan error {
return errCh
}
- // config key for the particular sub-driver
+ // config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
- // at this point we know, that driver field present in the cofiguration
+ // at this point we know, that driver field present in the configuration
switch v.(map[string]interface{})[driver] {
case memcached:
if _, ok := p.drivers[memcached]; !ok {
diff --git a/plugins/logger/std_log_adapter.go b/plugins/logger/std_log_adapter.go
index 484cc23e..479aa565 100644
--- a/plugins/logger/std_log_adapter.go
+++ b/plugins/logger/std_log_adapter.go
@@ -1,7 +1,7 @@
package logger
import (
- "unsafe"
+ "github.com/spiral/roadrunner/v2/utils"
)
// StdLogAdapter can be passed to the http.Server or any place which required standard logger to redirect output
@@ -12,7 +12,7 @@ type StdLogAdapter struct {
// Write io.Writer interface implementation
func (s *StdLogAdapter) Write(p []byte) (n int, err error) {
- s.log.Error("server internal error", "message", toString(p))
+ s.log.Error("server internal error", "message", utils.AsString(p))
return len(p), nil
}
@@ -24,8 +24,3 @@ func NewStdAdapter(log Logger) *StdLogAdapter {
return logAdapter
}
-
-// unsafe, but lightning fast []byte to string conversion
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))
-}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
new file mode 100644
index 00000000..2ad041aa
--- /dev/null
+++ b/plugins/memory/plugin.go
@@ -0,0 +1,76 @@
+package memory
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "memory"
+)
+
+type Plugin struct {
+ log logger.Logger
+
+ // channel with the messages from the RPC
+ pushCh chan pubsub.Message
+ // user-subscribed topics
+ topics sync.Map
+}
+
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ p.pushCh = make(chan pubsub.Message, 100)
+ return nil
+}
+
+// Available interface implementation for the plugin
+func (p *Plugin) Available() {}
+
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Publish(messages []pubsub.Message) error {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(messages []pubsub.Message) {
+ go func() {
+ for i := 0; i < len(messages); i++ {
+ p.pushCh <- messages[i]
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ for i := 0; i < len(topics); i++ {
+ p.topics.Store(topics[i], struct{}{})
+ }
+ return nil
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ for i := 0; i < len(topics); i++ {
+ p.topics.Delete(topics[i])
+ }
+ return nil
+}
+
+func (p *Plugin) Next() (pubsub.Message, error) {
+ msg := <-p.pushCh
+ // push only messages, which are subscribed
+ // TODO better???
+ for i := 0; i < len(msg.Topics()); i++ {
+ if _, ok := p.topics.Load(msg.Topics()[i]); ok {
+ return msg, nil
+ }
+ }
+ return nil, nil
+}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
new file mode 100644
index 00000000..8e924b2d
--- /dev/null
+++ b/plugins/redis/fanin.go
@@ -0,0 +1,100 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type FanIn struct {
+ sync.Mutex
+
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan pubsub.Message
+
+ exit chan struct{}
+}
+
+func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
+ out := make(chan pubsub.Message, 100)
+ fi := &FanIn{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (fi *FanIn) AddChannel(topics ...string) error {
+ const op = errors.Op("fanin_addchannel")
+ err := fi.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (fi *FanIn) read() {
+ for {
+ select {
+ // here we receive message from us (which we sent before in Publish)
+ // it should be compatible with the websockets.Msg interface
+ // payload should be in the redis.message.payload field
+
+ case msg, ok := <-fi.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+ m := &pubsub.Msg{}
+ err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("failed to unmarshal payload", "error", err.Error())
+ continue
+ }
+
+ fi.out <- m
+ case <-fi.exit:
+ return
+ }
+ }
+}
+
+func (fi *FanIn) RemoveChannel(topics ...string) error {
+ const op = errors.Op("fanin_remove")
+ err := fi.pubsub.Unsubscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (fi *FanIn) Stop() error {
+ fi.exit <- struct{}{}
+ close(fi.out)
+ close(fi.exit)
+ return nil
+}
+
+func (fi *FanIn) Consume() <-chan pubsub.Message {
+ return fi.out
+}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 2eab7043..24ed1f92 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -1,8 +1,12 @@
package redis
import (
+ "context"
+ "sync"
+
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -10,73 +14,133 @@ import (
const PluginName = "redis"
type Plugin struct {
+ sync.Mutex
// config for RR integration
cfg *Config
// logger
log logger.Logger
// redis universal client
universalClient redis.UniversalClient
+
+ fanin *FanIn
}
-func (s *Plugin) GetClient() redis.UniversalClient {
- return s.universalClient
+func (p *Plugin) GetClient() redis.UniversalClient {
+ return p.universalClient
}
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("redis_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- s.cfg.InitDefaults()
- s.log = log
-
- s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: s.cfg.Addrs,
- DB: s.cfg.DB,
- Username: s.cfg.Username,
- Password: s.cfg.Password,
- SentinelPassword: s.cfg.SentinelPassword,
- MaxRetries: s.cfg.MaxRetries,
- MinRetryBackoff: s.cfg.MaxRetryBackoff,
- MaxRetryBackoff: s.cfg.MaxRetryBackoff,
- DialTimeout: s.cfg.DialTimeout,
- ReadTimeout: s.cfg.ReadTimeout,
- WriteTimeout: s.cfg.WriteTimeout,
- PoolSize: s.cfg.PoolSize,
- MinIdleConns: s.cfg.MinIdleConns,
- MaxConnAge: s.cfg.MaxConnAge,
- PoolTimeout: s.cfg.PoolTimeout,
- IdleTimeout: s.cfg.IdleTimeout,
- IdleCheckFrequency: s.cfg.IdleCheckFreq,
- ReadOnly: s.cfg.ReadOnly,
- RouteByLatency: s.cfg.RouteByLatency,
- RouteRandomly: s.cfg.RouteRandomly,
- MasterName: s.cfg.MasterName,
+ p.cfg.InitDefaults()
+ p.log = log
+
+ p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: p.cfg.Addrs,
+ DB: p.cfg.DB,
+ Username: p.cfg.Username,
+ Password: p.cfg.Password,
+ SentinelPassword: p.cfg.SentinelPassword,
+ MaxRetries: p.cfg.MaxRetries,
+ MinRetryBackoff: p.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: p.cfg.MaxRetryBackoff,
+ DialTimeout: p.cfg.DialTimeout,
+ ReadTimeout: p.cfg.ReadTimeout,
+ WriteTimeout: p.cfg.WriteTimeout,
+ PoolSize: p.cfg.PoolSize,
+ MinIdleConns: p.cfg.MinIdleConns,
+ MaxConnAge: p.cfg.MaxConnAge,
+ PoolTimeout: p.cfg.PoolTimeout,
+ IdleTimeout: p.cfg.IdleTimeout,
+ IdleCheckFrequency: p.cfg.IdleCheckFreq,
+ ReadOnly: p.cfg.ReadOnly,
+ RouteByLatency: p.cfg.RouteByLatency,
+ RouteRandomly: p.cfg.RouteRandomly,
+ MasterName: p.cfg.MasterName,
})
+ // init fanin
+ p.fanin = NewFanIn(p.universalClient, log)
+
return nil
}
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
return errCh
}
-func (s Plugin) Stop() error {
- return s.universalClient.Close()
+func (p *Plugin) Stop() error {
+ const op = errors.Op("redis_plugin_stop")
+ err := p.fanin.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = p.universalClient.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Available interface implementation
-func (s *Plugin) Available() {
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ return f.Err()
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ f := p.universalClient.Publish(context.Background(), msg[i].Topics()[j], msg[i])
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", msg[i].Topics()[j], "error", f.Err().Error())
+ continue
+ }
+ }
+ }
+ }()
+}
+
+func (p *Plugin) Subscribe(topics ...string) error {
+ return p.fanin.AddChannel(topics...)
+}
+
+func (p *Plugin) Unsubscribe(topics ...string) error {
+ return p.fanin.RemoveChannel(topics...)
+}
+
+// Next return next message
+func (p *Plugin) Next() (pubsub.Message, error) {
+ return <-p.fanin.Consume(), nil
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index ef77f7ab..aab9dcde 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -25,9 +25,9 @@ import (
const PluginName = "server"
// RR_RELAY env variable key (internal)
-const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck
+const RR_RELAY = "RR_RELAY" //nolint:stylecheck
// RR_RPC env variable key (internal) if the RPC presents
-const RR_RPC = "RR_RPC" //nolint:golint,stylecheck
+const RR_RPC = "RR_RPC" //nolint:stylecheck
// Plugin manages worker
type Plugin struct {
diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go
new file mode 100644
index 00000000..18c63be3
--- /dev/null
+++ b/plugins/websockets/commands/enums.go
@@ -0,0 +1,9 @@
+package commands
+
+type Command string
+
+const (
+ Leave string = "leave"
+ Join string = "join"
+ Headers string = "headers"
+)
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
new file mode 100644
index 00000000..f3cb8e12
--- /dev/null
+++ b/plugins/websockets/config.go
@@ -0,0 +1,67 @@
+package websockets
+
+import "time"
+
+/*
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # also, they should implement RPC methods to publish data into them
+ # pubsubs might use general config section or its own
+
+ pubsubs:["redis", "amqp", "memory"]
+
+ # sample of the own config section for the redis pubsub driver
+ redis:
+ address:
+ - localhost:1111
+ .... the rest
+
+
+ # path used as websockets path
+ path: "/ws"
+*/
+
+// Config represents configuration for the ws plugin
+type Config struct {
+ // http path for the websocket
+ Path string `mapstructure:"path"`
+ // ["redis", "amqp", "memory"]
+ PubSubs []string `mapstructure:"pubsubs"`
+ Middleware []string `mapstructure:"middleware"`
+ Redis *RedisConfig `mapstructure:"redis"`
+}
+
+type RedisConfig struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefault initialize default values for the ws config
+func (c *Config) InitDefault() {
+ if c.Path == "" {
+ c.Path = "/ws"
+ }
+ if len(c.PubSubs) == 0 {
+ // memory used by default
+ c.PubSubs = append(c.PubSubs, "memory")
+ }
+}
diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go
new file mode 100644
index 00000000..2b847173
--- /dev/null
+++ b/plugins/websockets/connection/connection.go
@@ -0,0 +1,67 @@
+package connection
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Connection represents wrapped and safe to use from the different threads websocket connection
+type Connection struct {
+ sync.RWMutex
+ log logger.Logger
+ conn *websocket.Conn
+}
+
+func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection {
+ return &Connection{
+ conn: wsConn,
+ log: log,
+ }
+}
+
+func (c *Connection) Write(mt int, data []byte) error {
+ c.Lock()
+ defer c.Unlock()
+
+ const op = errors.Op("websocket_write")
+ // handle a case when a goroutine tried to write into the closed connection
+ defer func() {
+ if r := recover(); r != nil {
+ c.log.Warn("panic handled, tried to write into the closed connection")
+ }
+ }()
+
+ err := c.conn.WriteMessage(mt, data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *Connection) Read() (int, []byte, error) {
+ const op = errors.Op("websocket_read")
+
+ mt, data, err := c.conn.ReadMessage()
+ if err != nil {
+ return -1, nil, errors.E(op, err)
+ }
+
+ return mt, data, nil
+}
+
+func (c *Connection) Close() error {
+ c.Lock()
+ defer c.Unlock()
+ const op = errors.Op("websocket_close")
+
+ err := c.conn.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio
new file mode 100644
index 00000000..748fec45
--- /dev/null
+++ b/plugins/websockets/doc/broadcast.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-23T20:08:57.443Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="PhXRtBI6dFZzw_439Bi0" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">5VpZc+I4EP41VGUfoISNDx45ctUkm4NkZzIvU8KWjTbCYmWRwPz6lWzZ2JZhIByZrWWmiNU6aH19qLuthjmYLi4ZnE1uqY9IwwD+omEOG4bRtk1D/JGUZUrpdtyUEDLsq0Erwgj/RIoIFHWOfRSXBnJKCcezMtGjUYQ8XqJBxuh7eVhASflXZzBEGmHkQaJTv2KfTxS1a4BVxxXC4ST7aQOoninMRitCPIE+fS+QzPOGOWCU8vRpuhggItHLgEnnXazpzTljKOLbTFh2bm+eXvow8p5M8vrAMIh6za7ijS+zHSNfAKCalPEJDWkEyfmK2l9RbyidiWFtQfwbcb5U8oNzTgVpwqdE9aIF5t8Kzy/iGbQs1RpKdQFZY5k1Is6W34qNwizZXE1LWtk8HRcFVUznzEMbwMgUDLIQ8U3jlEQlVIVfULBfIjpFgiExgCECOX4r6xJUKhnm4/Kp9xQLng2gzMfOVEcZT6cDykuknKpZK9GLhwIbK1KiEDsoh2L4DZK52kL/8a43HPRGT4J89SygBmf3N8+X13/+oenR+wRzNJrBBPF34R3KGhFgQgaUUJaMNn2I3MAT9Jgz+ooKPbbnonEgZ9CIF+gg+WyS9xtiHC02Ckj1dswK0JmE3ws2nwljUjB3G6yXaUkau0JvaNB/He0JcWDJf7UQJx8d4vRzGIhzj64gNrpbQlzV+YNBbGoQP54Pr/dGOQgMr1aRfXtsWzUoH1KRqyh3nM9GuaOhfP1n8/b89u7xZW+kkb0Gaac7ThA9IdKW5X4y0q6GtNg6EUFRLFGej5szMg9xFGuwix3zMtKQ4DASz57ABwnw+hIXLKKinuqYYt9PYgGGYvwTjpOlJLIzeRYlO7P6DWso1xLBQJzGBe3jeGvT3lLJjQNAb3Z+DIMp/9JedBcE/ugPbl7+arYtDXvztIFVFj2lIZKzZWTVLkdWzhFDK7BtbLVlaKV0AbTaHaNbUodMGB8NvrIhNAhidJy4CuhHz71YCvSf9z1+UNu3kFPnFLu2Y8IjHz9O9xPjqMHwhUy/O/5iPut98f2bhyGPm/aJzDDPVVbpyUvJhOoNavcoeCsTrIVi2+zG2NICt7avvQRYd6qlqb6sBMivCRLfl3eS5cifKSM+Cxid5r0ejQIc6jmK3OMNHCNyvONPl9cmPdVMLa9sqB9pFGsHdSbYBC3bdsoeMRPpnulo0zRbZnnhtlle5CA+c5P2FvRAkyaMZ2kJKMAL5JdFWhOXVIWMp0ktKPGPytDbxoo+xNNQcE7wWHzDn3OG5CZDFCEGBfcXfVlsQqwVv4WHcaa2ZbSscqCje1PXycYU3alztEBHR/24YU1eI8q96Rb1orIPbv/CB3/co5rbxjQHrxftJURDDz/qxXpy17hR5/b3jaBlmFbZpDK/sqdrrESgRsvunsox6mWM++f+zfXoatdI8gAuywV2yygDbNl6XlzvtMyjOS397Ghr4PyP3Zilu7FNmrZ9amZUTA18xNZ6jMFlYYDyI2tN0bXKOYhdfS2y23jxkHJwzCxxk1iKhn11/wlGXU3qbGPLStchDLoWma6GzAi+oTzaf0fjmHqvUirZCYFppCH3Xyt7WZW3QfmbxqIc3JOWvTQ59B/vvpw/NswL+d8AzYZhw6lUyZAnIIDRfBx7DI8FvKfXZNsqpy/tGgRd54Qnk67IAjAioZqVsLH/mcu3xP2pcNFYKGJP9ILZQnynqKb0JpfHkuzrFPqkmjeVZss+pdylbh95lMHESpIx88hHjOAIrX5aPCkZpgyOM8KIs7nHRTqU9QgkxtXRgjar0iasSvnQpgkKeNrpys4qv090hr1kzjBZneEo3MTVMXgYZR6owgU4ywsTCtQcknu4JBT6+RQG3/VBJ99HwXgVX0Vfm9Vc8k2Bswlkvkd95NdssZ77ilOoeuBExXNHnfLbp2JUQJK7FbKgVpPDy/YFnGIiLf8KkTckVz24g+lWYgnLdFtZiaTgY8y609Ltyvr7kRy1nvHpAXDk9+SFGdGKaITW1TcbO93F+Mi9j48HsjX5+Ca3+8tAtiAyq0Zi1ofiWT13rLxJtIFbXiLdt3bLQ1/IrmifVVnocNdF6rVMT0V7tw87h6y/9Wte+5SveetRdtYGDXNS9a0EZ5TBBHmvibKGOBYBAJJnC5eHY1xwxKvx+gqPyMesUHj3mdB5tnGyIBZ52k0PKk7+EBG0Wa5NOHW3I5ya0sQhhHn5BoIF+xE9jB5vX69h8+rqu1sTQYuQGVwmx2mcRQ3Vg7MwZH00vT67UXj+VrmNXb234tZIBhwmxxTN1aXL1Pet7q6a5/8C</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/broadcast_arch.drawio b/plugins/websockets/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..54bba9c7
--- /dev/null
+++ b/plugins/websockets/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-24T11:49:56.412Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="onoEtIi9b01eD9L0oL7y" version="14.5.1" type="device"><diagram id="uVz6nu4iiJ3Jh2FwnUZb" name="Page-1">zZSxboMwEEC/hrES4JAma9I0WSpVYkCdKgdfsVXDIWMK6dfXNEcAoUjtUKUT9rvz2X4ceGybt3vDS/mEArQX+qL12IMXhsE6WrlHR05EAnZ/JplRgtgAYvUJBH2itRJQTRItoraqnMIUiwJSO2HcGGymaW+op7uWPIMZiFOu5zRRwsr+YqE/BA6gMtlvHfoUyXmfTaCSXGAzQmznsa1BtOdR3m5Bd/p6Med1j1eil5MZKOxPFpS71cthFbVJvE+Xr4k0+IZ3VOWD65pu/FwftaokmK64rjNV0PHtqZdisC4EdGV9j20aqSzEJU+7aOP6wDFpc+1mgRvSBmAstFdPHlx8uFYCzMGak0uhBeGCFFIXsTXNm9Er6TXL0dtgxDh1QXYpPXhyA1L1C23hTNvGIBcpr+y/sxUFt7bFZrYSOFaYvsPtbS2WU1uL6O9suenwuX/HRr9NtvsC</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/ws.drawio b/plugins/websockets/doc/ws.drawio
new file mode 100644
index 00000000..739b797a
--- /dev/null
+++ b/plugins/websockets/doc/ws.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
new file mode 100644
index 00000000..9ef5e40a
--- /dev/null
+++ b/plugins/websockets/executor/executor.go
@@ -0,0 +1,121 @@
+package executor
+
+import (
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/commands"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type Response struct {
+ Topic string `json:"topic"`
+ Payload []string `json:"payload"`
+}
+
+type Executor struct {
+ conn *connection.Connection
+ storage *storage.Storage
+ log logger.Logger
+
+ // associated connection ID
+ connID string
+ pubsub map[string]pubsub.PubSub
+}
+
+// NewExecutor creates protected connection and starts command loop
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs map[string]pubsub.PubSub) *Executor {
+ return &Executor{
+ conn: conn,
+ connID: connID,
+ storage: bst,
+ log: log,
+ pubsub: pubsubs,
+ }
+}
+
+func (e *Executor) StartCommandLoop() error {
+ for {
+ mt, data, err := e.conn.Read()
+ if err != nil {
+ if mt == -1 {
+ return err
+ }
+
+ return err
+ }
+
+ msg := &pubsub.Msg{}
+
+ err = json.Unmarshal(data, msg)
+ if err != nil {
+ e.log.Error("error unmarshal message", "error", err)
+ continue
+ }
+
+ switch msg.Command() {
+ // handle leave
+ case commands.Join:
+ // associate connection with topics
+ e.storage.Store(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@join",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ // handle leave
+ case commands.Leave:
+ // remove associated connections from the storage
+ e.storage.Remove(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@leave",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ case commands.Headers:
+
+ default:
+ e.log.Warn("unknown command", "command", msg.Command())
+ }
+ }
+}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
new file mode 100644
index 00000000..bc5028e6
--- /dev/null
+++ b/plugins/websockets/plugin.go
@@ -0,0 +1,203 @@
+package websockets
+
+import (
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/fasthttp/websocket"
+ "github.com/google/uuid"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/pool"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+const (
+ PluginName string = "websockets"
+)
+
+type Plugin struct {
+ sync.RWMutex
+ // Collection with all available pubsubs
+ pubsubs map[string]pubsub.PubSub
+
+ Config *Config
+ log logger.Logger
+
+ // global connections map
+ connections sync.Map
+ storage *storage.Storage
+
+ workersPool *pool.WorkersPool
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("websockets_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.Config)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.pubsubs = make(map[string]pubsub.PubSub)
+ p.log = log
+ p.storage = storage.NewStorage()
+ p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
+
+ // run all pubsubs drivers
+ for _, v := range p.pubsubs {
+ go func(ps pubsub.PubSub) {
+ for {
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
+
+ if data == nil {
+ continue
+ }
+
+ p.workersPool.Queue(data)
+ }
+ }(v)
+ }
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ p.workersPool.Stop()
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.GetPublishers,
+ }
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// GetPublishers collects all pubsubs
+func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) {
+ p.pubsubs[name.Name()] = pub
+}
+
+func (p *Plugin) Middleware(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != p.Config.Path {
+ next.ServeHTTP(w, r)
+ return
+ }
+
+ // connection upgrader
+ upgraded := websocket.Upgrader{
+ HandshakeTimeout: time.Second * 60,
+ ReadBufferSize: 0,
+ WriteBufferSize: 0,
+ WriteBufferPool: nil,
+ Subprotocols: nil,
+ Error: nil,
+ CheckOrigin: nil,
+ EnableCompression: false,
+ }
+
+ // upgrade connection to websocket connection
+ _conn, err := upgraded.Upgrade(w, r, nil)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ safeConn := connection.NewConnection(_conn, p.log)
+ defer func() {
+ err = safeConn.Close()
+ if err != nil {
+ p.log.Error("connection close error", "error", err)
+ }
+ }()
+
+ // generate UUID from the connection
+ connectionID := uuid.NewString()
+ // store connection
+ p.connections.Store(connectionID, safeConn)
+ // when exiting - delete the connection
+ defer func() {
+ p.connections.Delete(connectionID)
+ }()
+
+ // Executor wraps a connection to have a safe abstraction
+ p.Lock()
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs)
+ p.Unlock()
+
+ p.log.Info("websocket client connected", "uuid", connectionID)
+
+ err = e.StartCommandLoop()
+ if err != nil {
+ p.log.Error("command loop error", "error", err.Error())
+ return
+ }
+ })
+}
+
+// Publish is an entry point to the websocket PUBSUB
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ if br, ok := p.pubsubs[msg[i].Broker()]; ok {
+ err := br.Publish(msg)
+ if err != nil {
+ return errors.E(err)
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker())
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ err := p.pubsubs[msg[i].Broker()].Publish(msg)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
new file mode 100644
index 00000000..ee31d62f
--- /dev/null
+++ b/plugins/websockets/pool/workers_pool.go
@@ -0,0 +1,104 @@
+package pool
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type WorkersPool struct {
+ storage *storage.Storage
+ connections *sync.Map
+ resPool sync.Pool
+ log logger.Logger
+
+ queue chan pubsub.Message
+ exit chan struct{}
+}
+
+func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
+ wp := &WorkersPool{
+ connections: connections,
+ queue: make(chan pubsub.Message, 100),
+ storage: storage,
+ log: log,
+ exit: make(chan struct{}),
+ }
+
+ wp.resPool.New = func() interface{} {
+ return make(map[string]struct{}, 10)
+ }
+
+ for i := 0; i < 10; i++ {
+ wp.do()
+ }
+
+ return wp
+}
+
+func (wp *WorkersPool) Queue(msg pubsub.Message) {
+ wp.queue <- msg
+}
+
+func (wp *WorkersPool) Stop() {
+ for i := 0; i < 10; i++ {
+ wp.exit <- struct{}{}
+ }
+
+ close(wp.exit)
+}
+
+func (wp *WorkersPool) put(res map[string]struct{}) {
+ // optimized
+ // https://go-review.googlesource.com/c/go/+/110055/
+ // not O(n), but O(1)
+ for k := range res {
+ delete(res, k)
+ }
+}
+
+func (wp *WorkersPool) get() map[string]struct{} {
+ return wp.resPool.Get().(map[string]struct{})
+}
+
+func (wp *WorkersPool) do() {
+ go func() {
+ for {
+ select {
+ case msg := <-wp.queue:
+ res := wp.get()
+ // get connections for the particular topic
+ wp.storage.Get(msg.Topics(), res)
+ if len(res) == 0 {
+ wp.log.Info("no such topic", "topic", msg.Topics())
+ wp.put(res)
+ continue
+ }
+
+ for i := range res {
+ c, ok := wp.connections.Load(i)
+ if !ok {
+ panic("not ok here (((")
+ }
+
+ conn := c.(*connection.Connection)
+ err := conn.Write(websocket.BinaryMessage, msg.Payload())
+ if err != nil {
+ // TODO handle error
+ wp.put(res)
+ continue
+ }
+ }
+
+ wp.put(res)
+ case <-wp.exit:
+ wp.log.Info("get exit signal, exiting from the workers pool")
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
new file mode 100644
index 00000000..2b3ae54e
--- /dev/null
+++ b/plugins/websockets/rpc.go
@@ -0,0 +1,47 @@
+package websockets
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error {
+ const op = errors.Op("broadcast_publish")
+
+ // publish to the registered broker
+ mi := make([]pubsub.Message, 0, len(msg))
+ // golang can't convert slice in-place
+ // so, we need to convert it manually
+ for i := 0; i < len(msg); i++ {
+ mi = append(mi, msg[i])
+ }
+ err := r.plugin.Publish(mi)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+func (r *rpc) PublishAsync(msg []*pubsub.Msg, ok *bool) error {
+ // publish to the registered broker
+ mi := make([]pubsub.Message, 0, len(msg))
+ // golang can't convert slice in-place
+ // so, we need to convert it manually
+ for i := 0; i < len(msg); i++ {
+ mi = append(mi, msg[i])
+ }
+
+ r.plugin.PublishAsync(mi)
+
+ *ok = true
+ return nil
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
new file mode 100644
index 00000000..34f53cfd
--- /dev/null
+++ b/plugins/websockets/storage/storage.go
@@ -0,0 +1,50 @@
+package storage
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/bst"
+)
+
+type Storage struct {
+ sync.RWMutex
+ BST bst.Storage
+}
+
+func NewStorage() *Storage {
+ return &Storage{
+ BST: bst.NewBST(),
+ }
+}
+
+func (s *Storage) Store(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Insert(connID, topics[i])
+ }
+}
+
+func (s *Storage) Remove(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Remove(connID, topics[i])
+ }
+}
+
+func (s *Storage) Get(topics []string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ for i := 0; i < len(topics); i++ {
+ d := s.BST.Get(topics[i])
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+ }
+}
diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go
new file mode 100644
index 00000000..9d9522d4
--- /dev/null
+++ b/plugins/websockets/validator/access_validator.go
@@ -0,0 +1,102 @@
+package validator
+
+import (
+ "bytes"
+ "io"
+ "net/http"
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
+)
+
+type AccessValidator struct {
+ buffer *bytes.Buffer
+ header http.Header
+ status int
+}
+
+func NewValidator() *AccessValidator {
+ return &AccessValidator{
+ buffer: bytes.NewBuffer(nil),
+ header: make(http.Header),
+ }
+}
+
+// Copy all content to parent response writer.
+func (w *AccessValidator) Copy(rw http.ResponseWriter) {
+ rw.WriteHeader(w.status)
+
+ for k, v := range w.header {
+ for _, vv := range v {
+ rw.Header().Add(k, vv)
+ }
+ }
+
+ _, _ = io.Copy(rw, w.buffer)
+}
+
+// Header returns the header map that will be sent by WriteHeader.
+func (w *AccessValidator) Header() http.Header {
+ return w.header
+}
+
+// Write writes the data to the connection as part of an HTTP reply.
+func (w *AccessValidator) Write(p []byte) (int, error) {
+ return w.buffer.Write(p)
+}
+
+// WriteHeader sends an HTTP response header with the provided status code.
+func (w *AccessValidator) WriteHeader(statusCode int) {
+ w.status = statusCode
+}
+
+// IsOK returns true if response contained 200 status code.
+func (w *AccessValidator) IsOK() bool {
+ return w.status == 200
+}
+
+// Body returns response body to rely to user.
+func (w *AccessValidator) Body() []byte {
+ return w.buffer.Bytes()
+}
+
+// Error contains server response.
+func (w *AccessValidator) Error() string {
+ return w.buffer.String()
+}
+
+// AssertServerAccess checks if user can join server and returns error and body if user can not. Must return nil in
+// case of error
+func (w *AccessValidator) AssertServerAccess(f http.HandlerFunc, r *http.Request) error {
+ if err := attributes.Set(r, "ws:joinServer", true); err != nil {
+ return err
+ }
+
+ defer delete(attributes.All(r), "ws:joinServer")
+
+ f(w, r)
+
+ if !w.IsOK() {
+ return w
+ }
+
+ return nil
+}
+
+// AssertTopicsAccess checks if user can access given upstream, the application will receive all user headers and cookies.
+// the decision to authorize user will be based on response code (200).
+func (w *AccessValidator) AssertTopicsAccess(f http.HandlerFunc, r *http.Request, channels ...string) error {
+ if err := attributes.Set(r, "ws:joinTopics", strings.Join(channels, ",")); err != nil {
+ return err
+ }
+
+ defer delete(attributes.All(r), "ws:joinTopics")
+
+ f(w, r)
+
+ if !w.IsOK() {
+ return w
+ }
+
+ return nil
+}
diff --git a/plugins/websockets/validator/access_validator_test.go b/plugins/websockets/validator/access_validator_test.go
new file mode 100644
index 00000000..4a07b00f
--- /dev/null
+++ b/plugins/websockets/validator/access_validator_test.go
@@ -0,0 +1,35 @@
+package validator
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestResponseWrapper_Body(t *testing.T) {
+ w := NewValidator()
+ _, _ = w.Write([]byte("hello"))
+
+ assert.Equal(t, []byte("hello"), w.Body())
+}
+
+func TestResponseWrapper_Header(t *testing.T) {
+ w := NewValidator()
+ w.Header().Set("k", "value")
+
+ assert.Equal(t, "value", w.Header().Get("k"))
+}
+
+func TestResponseWrapper_StatusCode(t *testing.T) {
+ w := NewValidator()
+ w.WriteHeader(200)
+
+ assert.True(t, w.IsOK())
+}
+
+func TestResponseWrapper_StatusCodeBad(t *testing.T) {
+ w := NewValidator()
+ w.WriteHeader(400)
+
+ assert.False(t, w.IsOK())
+}
diff --git a/tests/Dockerfile b/tests/Dockerfile
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/Dockerfile
diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml
index 67d5476b..a8e8a7c5 100644
--- a/tests/docker-compose.yaml
+++ b/tests/docker-compose.yaml
@@ -7,5 +7,8 @@ services:
- "0.0.0.0:11211:11211"
redis:
image: redis:6
+ mem_limit: 16384m
+ mem_reservation: 2048M
+ cpus: 8
ports:
- "6379:6379"
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 575fe656..f6533dc4 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -12,8 +12,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/stretchr/testify/assert"
"net/http"
diff --git a/tests/plugins/http/parse_test.go b/tests/plugins/http/parse_test.go
index 15c82839..d75620f3 100644
--- a/tests/plugins/http/parse_test.go
+++ b/tests/plugins/http/parse_test.go
@@ -3,7 +3,7 @@ package http
import (
"testing"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
)
var samples = []struct {
diff --git a/tests/plugins/http/response_test.go b/tests/plugins/http/response_test.go
index 3564d9cd..276c22ef 100644
--- a/tests/plugins/http/response_test.go
+++ b/tests/plugins/http/response_test.go
@@ -7,7 +7,7 @@ import (
"testing"
"github.com/spiral/roadrunner/v2/pkg/payload"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/stretchr/testify/assert"
)
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index 5c39589c..903a930a 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -18,8 +18,8 @@ import (
j "github.com/json-iterator/go"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/http/config"
- handler "github.com/spiral/roadrunner/v2/plugins/http/worker_handler"
"github.com/stretchr/testify/assert"
)
diff --git a/tests/plugins/informer/.rr-informer.yaml b/tests/plugins/informer/.rr-informer.yaml
index e1edbb44..94c9a856 100644
--- a/tests/plugins/informer/.rr-informer.yaml
+++ b/tests/plugins/informer/.rr-informer.yaml
@@ -3,8 +3,8 @@ server:
user: ""
group: ""
env:
- "RR_CONFIG": "/some/place/on/the/C134"
- "RR_CONFIG2": "C138"
+ - RR_CONFIG: "/some/place/on/the/C134"
+ - RR_CONFIG: "C138"
relay: "pipes"
relay_timeout: "20s"
@@ -12,4 +12,4 @@ rpc:
listen: tcp://127.0.0.1:6001
logs:
mode: development
- level: error \ No newline at end of file
+ level: error
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
new file mode 100644
index 00000000..9973b2dc
--- /dev/null
+++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
@@ -0,0 +1,50 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:15395
+ max_request_size: 1024
+ middleware: ["websockets"]
+ uploads:
+ forbid: [ ".php", ".exe", ".bat" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # also, they should implement RPC methods to publish data into them
+ # pubsubs might use general config section or its own
+
+ pubsubs: [ "redis" ]
+
+ # sample of the own config section for the redis pubsub driver
+
+ redis:
+ addrs:
+ - localhost:1111
+ # path used as websockets path
+ path: "/ws"
+logs:
+ mode: development
+ level: debug
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
new file mode 100644
index 00000000..a9b90fd0
--- /dev/null
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -0,0 +1,102 @@
+package websockets
+
+import (
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/websockets"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBroadcastInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.InfoLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-websockets-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1000)
+ t.Run("test1", test1)
+ t.Run("test2", test2)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func test1(t *testing.T) {
+
+}
+
+func test2(t *testing.T) {
+
+}
diff --git a/utils/network.go b/utils/network.go
index b73363db..86a7e733 100755
--- a/utils/network.go
+++ b/utils/network.go
@@ -12,7 +12,8 @@ import (
"github.com/valyala/tcplisten"
)
-// - SO_REUSEPORT. This option allows linear scaling server performance
+// CreateListener
+// - SO_REUSEPORT. This option allows linear scaling server performance
// on multi-CPU servers.
// See https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ for details.
//