summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linters.yml2
-rwxr-xr-x.golangci.yml9
-rw-r--r--go.mod1
-rw-r--r--go.sum2
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/worker_watcher/interface.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go6
-rw-r--r--plugins/broadcast/config.go9
-rw-r--r--plugins/broadcast/doc/ws.drawio1
-rw-r--r--plugins/broadcast/interface.go28
-rw-r--r--plugins/broadcast/memory/bst/bst.go134
-rw-r--r--plugins/broadcast/memory/bst/bst_test.go33
-rw-r--r--plugins/broadcast/memory/bst/interface.go11
-rw-r--r--plugins/broadcast/memory/driver.go28
-rw-r--r--plugins/broadcast/plugin.go16
-rw-r--r--plugins/broadcast/redis/driver.go28
-rw-r--r--plugins/broadcast/ws/commands/leave.go1
-rw-r--r--plugins/broadcast/ws/commands/subscribe.go1
-rw-r--r--plugins/broadcast/ws/connection/connection.go (renamed from plugins/broadcast/ws/connection.go)2
-rw-r--r--plugins/broadcast/ws/plugin.go25
-rw-r--r--plugins/broadcast/ws/subscriber.go41
-rw-r--r--plugins/broadcast/ws/ws_middleware.go13
-rw-r--r--plugins/gzip/plugin.go5
-rw-r--r--plugins/kv/storage.go2
-rw-r--r--plugins/server/plugin.go4
26 files changed, 326 insertions, 86 deletions
diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml
index 82072675..24d839e5 100644
--- a/.github/workflows/linters.yml
+++ b/.github/workflows/linters.yml
@@ -13,6 +13,6 @@ jobs:
- name: Run linter
uses: golangci/golangci-lint-action@v2 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
- version: v1.39 # without patch version
+ version: v1.41 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m
diff --git a/.golangci.yml b/.golangci.yml
index bfc69f57..41e1d68f 100755
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -14,8 +14,10 @@ output:
linters-settings:
govet:
check-shadowing: true
- golint:
- min-confidence: 0.1
+ revive:
+ confidence: 0.8
+ errorCode: 0
+ warningCode: 0
gocyclo:
min-complexity: 15
godot:
@@ -55,7 +57,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- gocritic # The most opinionated Go source code linter
- gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification
- goimports # Goimports does everything that gofmt does. Additionally it checks unused imports
- - golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
+ - revive # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes
- goprintffuncname # Checks that printf-like functions are named with `f` at the end
- gosec # Inspects source code for security problems
- gosimple # Linter for Go source code that specializes in simplifying a code
@@ -88,3 +90,4 @@ issues:
- goconst
- noctx
- gosimple
+ - revive
diff --git a/go.mod b/go.mod
index a479eb51..1172b38c 100644
--- a/go.mod
+++ b/go.mod
@@ -17,6 +17,7 @@ require (
github.com/gofiber/websocket/v2 v2.0.3
github.com/golang/mock v1.4.4
github.com/google/flatbuffers v1.12.1
+ github.com/google/uuid v1.2.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.11
github.com/klauspost/compress v1.12.2 // indirect
diff --git a/go.sum b/go.sum
index 2a7e635c..e68684d6 100644
--- a/go.sum
+++ b/go.sum
@@ -172,6 +172,8 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
+github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index d57cc95c..8c9d69b9 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -47,7 +47,7 @@ type StaticPool struct {
allocator worker.Allocator
// err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint:golint,stylecheck
+ err_encoder ErrorEncoder //nolint:stylecheck
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 40903db3..ca61dbc4 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -15,7 +15,7 @@ import (
const MB = 1024 * 1024
// NSEC_IN_SEC nanoseconds in second
-const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
type Supervised interface {
Pool
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 4625b7a7..29fa3640 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -23,9 +23,9 @@ type Watcher interface {
// Destroy destroys the underlying container
Destroy(ctx context.Context)
- // WorkersList return all container w/o removing it from internal storage
+ // List return all container w/o removing it from internal storage
List() []worker.BaseProcess
- // RemoveWorker remove worker from the container
+ // Remove will remove worker from the container
Remove(wb worker.BaseProcess)
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 5aec4ee6..557563ac 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -1,4 +1,4 @@
-package worker_watcher //nolint:golint,stylecheck
+package worker_watcher //nolint:stylecheck
import (
"context"
@@ -11,7 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead container will not be replaced
+// NewSyncWorkerWatcher is a constructor for the Watcher
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
@@ -215,7 +215,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
}
-// Warning, this is O(n) operation, and it will return copy of the actual workers
+// List - this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
index 03bf6510..5e7b7f20 100644
--- a/plugins/broadcast/config.go
+++ b/plugins/broadcast/config.go
@@ -3,16 +3,17 @@ package broadcast
/*
broadcast:
ws-us-region-1:
- subscriber: ws
- path: "/ws"
+ subscriber: websockets
+ middleware: ["headers", "gzip"] # ????
+ address: "localhost:53223"
+ path: "/ws"
- driver: redis
+ storage: redis
address:
- 6379
db: 0
*/
-
// Config represents configuration for the ws plugin
type Config struct {
// Sections represent particular broadcast plugin section
diff --git a/plugins/broadcast/doc/ws.drawio b/plugins/broadcast/doc/ws.drawio
new file mode 100644
index 00000000..739b797a
--- /dev/null
+++ b/plugins/broadcast/doc/ws.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
index 3ed8b412..47c779b5 100644
--- a/plugins/broadcast/interface.go
+++ b/plugins/broadcast/interface.go
@@ -1,25 +1,29 @@
package broadcast
-import "encoding/json"
+import (
+ "encoding/json"
+)
// Subscriber defines the ability to operate as message passing broker.
type Subscriber interface {
// Subscribe broker to one or multiple topics.
- Subscribe(upstream chan *Message, topics ...string) error
-
- // SubscribePattern broker to pattern.
- SubscribePattern(upstream chan *Message, pattern string) error
-
- // Unsubscribe broker from one or multiple topics.
- Unsubscribe(upstream chan *Message, topics ...string) error
-
+ Subscribe(topics ...string) error
// UnsubscribePattern broker from pattern.
- UnsubscribePattern(upstream chan *Message, pattern string) error
+ UnsubscribePattern(pattern string) error
}
+// Storage used to store patterns and topics
type Storage interface {
- Store(topics ...string)
- StorePattern(pattern string)
+ // Store connection uuid associated with the provided topics
+ Store(uuid string, topics ...string)
+ // StorePattern stores pattern associated with the particular connection
+ StorePattern(uuid string, pattern string)
+
+ // GetConnection returns connections for the particular pattern
+ GetConnection(pattern string) []string
+
+ // Construct is a constructor for the storage according to the provided configuration key (broadcast.websocket for example)
+ Construct(key string) (Storage, error)
}
type Publisher interface {
diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/broadcast/memory/bst/bst.go
new file mode 100644
index 00000000..7d09a10f
--- /dev/null
+++ b/plugins/broadcast/memory/bst/bst.go
@@ -0,0 +1,134 @@
+package bst
+
+// BST ...
+type BST struct {
+ // registered topic, not unique
+ topic string
+ // associated connections with the topic
+ uuids map[string]struct{}
+
+ // left and right subtrees
+ left *BST
+ right *BST
+}
+
+func NewBST() Storage {
+ return &BST{}
+}
+
+// Insert uuid to the topic
+func (B *BST) Insert(uuid string, topic string) {
+ curr := B
+
+ for {
+ if curr.topic == topic {
+ curr.uuids[uuid] = struct{}{}
+ return
+ }
+ // if topic less than curr topic
+ if curr.topic < topic {
+ if curr.left == nil {
+ curr.left = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+ // move forward
+ curr = curr.left
+ } else {
+ if curr.right == nil {
+ curr.right = &BST{
+ topic: topic,
+ uuids: map[string]struct{}{uuid: {}},
+ }
+ return
+ }
+
+ curr = curr.right
+ }
+ }
+}
+
+func (B *BST) Get(topic string) map[string]struct{} {
+ curr := B
+ for curr != nil {
+ if curr.topic == topic {
+ return curr.uuids
+ }
+ if curr.topic < topic {
+ curr = curr.left
+ }
+ if curr.topic > topic {
+ curr = curr.right
+ }
+ }
+
+ return nil
+}
+
+func (B *BST) Remove(uuid string, topic string) {
+ B.removeHelper(uuid, topic, nil)
+}
+
+func (B *BST) removeHelper(uuid string, topic string, parent *BST) {
+ curr := B
+ for curr != nil {
+ if topic < curr.topic {
+ parent = curr
+ curr = curr.left
+ } else if topic > curr.topic {
+ parent = curr
+ curr = curr.right
+ } else {
+ if len(curr.uuids) > 1 {
+ if _, ok := curr.uuids[uuid]; ok {
+ delete(curr.uuids, uuid)
+ return
+ }
+ }
+
+ if curr.left != nil && curr.right != nil {
+ curr.topic, curr.uuids = curr.right.traverseForMinString()
+ curr.right.removeHelper(curr.topic, uuid, curr)
+ } else if parent == nil {
+ if curr.left != nil {
+ curr.topic = curr.left.topic
+ curr.uuids = curr.left.uuids
+
+ curr.right = curr.left.right
+ curr.left = curr.left.left
+ } else if curr.right != nil {
+ curr.topic = curr.right.topic
+ curr.uuids = curr.right.uuids
+
+ curr.left = curr.right.left
+ curr.right = curr.right.right
+ } else {
+ // single node tree
+ }
+ } else if parent.left == curr {
+ if curr.left != nil {
+ parent.left = curr.left
+ } else {
+ parent.left = curr.right
+ }
+ } else if parent.right == curr {
+ if curr.left != nil {
+ parent.right = curr.left
+ } else {
+ parent.right = curr.right
+ }
+ }
+ break
+ }
+ }
+}
+
+//go:inline
+func (B *BST) traverseForMinString() (string, map[string]struct{}) {
+ if B.left == nil {
+ return B.topic, B.uuids
+ }
+ return B.left.traverseForMinString()
+}
diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/broadcast/memory/bst/bst_test.go
new file mode 100644
index 00000000..b5ad6c10
--- /dev/null
+++ b/plugins/broadcast/memory/bst/bst_test.go
@@ -0,0 +1,33 @@
+package bst
+
+import (
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewBST(t *testing.T) {
+ g := NewBST()
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments2")
+ }
+
+ for i := 0; i < 100; i++ {
+ g.Insert(uuid.NewString(), "comments3")
+ }
+
+ exist := g.Get("comments")
+ assert.Len(t, exist, 100)
+
+ exist2 := g.Get("comments2")
+ assert.Len(t, exist2, 100)
+
+ exist3 := g.Get("comments3")
+ assert.Len(t, exist3, 100)
+}
diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/broadcast/memory/bst/interface.go
new file mode 100644
index 00000000..ecf40414
--- /dev/null
+++ b/plugins/broadcast/memory/bst/interface.go
@@ -0,0 +1,11 @@
+package bst
+
+// Storage is general in-memory BST storage implementation
+type Storage interface {
+ // Insert inserts to a vertex with topic ident connection uuid
+ Insert(uuid string, topic string)
+ // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
+ Remove(uuid, topic string)
+ // Get will return all connections associated with the topic
+ Get(topic string) map[string]struct{}
+}
diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go
index 2eb45c8e..80527e4b 100644
--- a/plugins/broadcast/memory/driver.go
+++ b/plugins/broadcast/memory/driver.go
@@ -1,39 +1,29 @@
package memory
-import "github.com/spiral/roadrunner/v2/plugins/broadcast"
+import (
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+)
type Driver struct {
}
-func NewInMemoryDriver() broadcast.Subscriber {
+func NewInMemoryDriver() broadcast.Storage {
b := &Driver{}
return b
}
-func (d *Driver) Serve() error {
+func (d *Driver) Store(uuid string, topics ...string) {
panic("implement me")
}
-func (d *Driver) Stop() {
+func (d *Driver) StorePattern(uuid string, pattern string) {
panic("implement me")
}
-func (d *Driver) Subscribe(upstream chan *broadcast.Message, topics ...string) error {
+func (d *Driver) GetConnection(pattern string) []string {
panic("implement me")
}
-func (d *Driver) SubscribePattern(upstream chan *broadcast.Message, pattern string) error {
- panic("implement me")
-}
-
-func (d *Driver) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error {
- panic("implement me")
-}
-
-func (d *Driver) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error {
- panic("implement me")
-}
-
-func (d *Driver) Publish(messages ...*broadcast.Message) error {
- panic("implement me")
+func (d *Driver) Construct(key string) (broadcast.Storage, error) {
+ return nil, nil
}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 7ad9e2ae..156bea80 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -47,6 +47,12 @@ func (p *Plugin) Serve() chan error {
return errCh
}
+ if p.driver == nil {
+ // Or if no storage detected, use in-memory storage
+ errCh <- errors.E(op, errors.Str("no storage detected"))
+ return errCh
+ }
+
// start the underlying broker
go func() {
// err := p.broker.Serve()
@@ -72,12 +78,16 @@ func (p *Plugin) Name() string {
func (p *Plugin) Collects() []interface{} {
return []interface{}{
- p.CollectBroker,
+ p.CollectSubscriber,
}
}
-func (p *Plugin) CollectBroker(name endure.Named, broker Subscriber) {
- p.broker = broker
+func (p *Plugin) CollectSubscriber(name endure.Named, subscriber Subscriber) {
+ p.broker = subscriber
+}
+
+func (p *Plugin) CollectStorage(name endure.Named, storage Storage) {
+ p.driver = storage
}
func (p *Plugin) RPC() interface{} {
diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go
index 65a229e1..556d5f03 100644
--- a/plugins/broadcast/redis/driver.go
+++ b/plugins/broadcast/redis/driver.go
@@ -1 +1,29 @@
package redis
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+)
+
+type Driver struct {
+}
+
+func NewInMemoryDriver() broadcast.Storage {
+ b := &Driver{}
+ return b
+}
+
+func (d *Driver) Store(uuid string, topics ...string) {
+ panic("implement me")
+}
+
+func (d *Driver) StorePattern(uuid string, pattern string) {
+ panic("implement me")
+}
+
+func (d *Driver) GetConnection(pattern string) []string {
+ panic("implement me")
+}
+
+func (d *Driver) Construct(key string) (broadcast.Storage, error) {
+ panic("implement me")
+}
diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go
new file mode 100644
index 00000000..cdff10da
--- /dev/null
+++ b/plugins/broadcast/ws/commands/leave.go
@@ -0,0 +1 @@
+package commands
diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go
new file mode 100644
index 00000000..cdff10da
--- /dev/null
+++ b/plugins/broadcast/ws/commands/subscribe.go
@@ -0,0 +1 @@
+package commands
diff --git a/plugins/broadcast/ws/connection.go b/plugins/broadcast/ws/connection/connection.go
index 9f7bf00e..cfb47e35 100644
--- a/plugins/broadcast/ws/connection.go
+++ b/plugins/broadcast/ws/connection/connection.go
@@ -1,4 +1,4 @@
-package ws
+package connection
import (
"sync"
diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go
index c9a97606..f075864b 100644
--- a/plugins/broadcast/ws/plugin.go
+++ b/plugins/broadcast/ws/plugin.go
@@ -8,10 +8,8 @@ import (
)
const (
- //
RootPluginName = "broadcast"
- //
- PluginName = "websockets"
+ PluginName = "websockets"
)
type Plugin struct {
@@ -21,7 +19,6 @@ type Plugin struct {
cfg config.Configurer
}
-
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("ws_plugin_init")
@@ -36,20 +33,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error {
- errCh := make(chan error)
-
- return errCh
-}
-
-func (p *Plugin) Stop() error {
- return nil
-}
-
func (p *Plugin) Name() string {
return PluginName
}
+// Provides Provide a ws implementation
func (p *Plugin) Provides() []interface{} {
return []interface{}{
p.Websocket,
@@ -57,9 +45,10 @@ func (p *Plugin) Provides() []interface{} {
}
// Websocket method should provide the Subscriber implementation to the broadcast
-func (p *Plugin) Websocket() (broadcast.Subscriber, error) {
+func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) {
const op = errors.Op("websocket_subscriber_provide")
- ws, err := NewWSSubscriber()
+ // initialize subscriber with the storage
+ ws, err := NewWSSubscriber(storage)
if err != nil {
return nil, errors.E(op, err)
}
@@ -67,6 +56,4 @@ func (p *Plugin) Websocket() (broadcast.Subscriber, error) {
return ws, nil
}
-
-
-func (p *Plugin) Available(){}
+func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go
index 2039cf95..660efdca 100644
--- a/plugins/broadcast/ws/subscriber.go
+++ b/plugins/broadcast/ws/subscriber.go
@@ -1,35 +1,50 @@
package ws
-import "github.com/spiral/roadrunner/v2/plugins/broadcast"
+import (
+ "github.com/gofiber/fiber/v2"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection"
+)
type Subscriber struct {
- connections map[string]*Connection
- storage broadcast.Storage
+ connections map[string]*connection.Connection
+ storage broadcast.Storage
}
-func NewWSSubscriber() (broadcast.Subscriber, error) {
- m := make(map[string]*Connection)
+// config
+//
+func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) {
+ m := make(map[string]*connection.Connection)
+
+ go func() {
+ app := fiber.New()
+ app.Use("/ws", wsMiddleware)
+ app.Listen(":8080")
+ }()
+
return &Subscriber{
connections: m,
+ storage: storage,
}, nil
}
-func (s *Subscriber) Subscribe(upstream chan *broadcast.Message, topics ...string) error {
+func (s *Subscriber) Subscribe(topics ...string) error {
panic("implement me")
-
-
-
-
}
-func (s *Subscriber) SubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+func (s *Subscriber) SubscribePattern(pattern string) error {
panic("implement me")
}
-func (s *Subscriber) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error {
+func (s *Subscriber) Unsubscribe(topics ...string) error {
panic("implement me")
}
-func (s *Subscriber) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+func (s *Subscriber) UnsubscribePattern(pattern string) error {
panic("implement me")
}
+
+func (s *Subscriber) Publish(messages ...*broadcast.Message) error {
+ s.storage.GetConnection(messages[9].Topic)
+ return nil
+}
diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go
new file mode 100644
index 00000000..068ef9fb
--- /dev/null
+++ b/plugins/broadcast/ws/ws_middleware.go
@@ -0,0 +1,13 @@
+package ws
+
+import (
+ "github.com/gofiber/fiber/v2"
+ "github.com/gofiber/websocket/v2"
+)
+
+func wsMiddleware(c *fiber.Ctx) error {
+ if websocket.IsWebSocketUpgrade(c) {
+ return c.Next()
+ }
+ return fiber.ErrUpgradeRequired
+}
diff --git a/plugins/gzip/plugin.go b/plugins/gzip/plugin.go
index 24b125fb..133704f8 100644
--- a/plugins/gzip/plugin.go
+++ b/plugins/gzip/plugin.go
@@ -4,6 +4,7 @@ import (
"net/http"
"github.com/NYTimes/gziphandler"
+ "github.com/gofiber/fiber/v2"
)
const PluginName = "gzip"
@@ -21,6 +22,10 @@ func (g *Plugin) Middleware(next http.Handler) http.Handler {
})
}
+func (g *Plugin) FiberMiddleware(ctx fiber.Ctx) {
+
+}
+
// Available interface implementation
func (g *Plugin) Available() {}
diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go
index 319915c5..9a609735 100644
--- a/plugins/kv/storage.go
+++ b/plugins/kv/storage.go
@@ -92,7 +92,7 @@ func (p *Plugin) Serve() chan error {
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
- // at this point we know, that driver field present in the cofiguration
+ // at this point we know, that driver field present in the configuration
switch v.(map[string]interface{})[driver] {
case memcached:
if _, ok := p.drivers[memcached]; !ok {
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index ef77f7ab..aab9dcde 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -25,9 +25,9 @@ import (
const PluginName = "server"
// RR_RELAY env variable key (internal)
-const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck
+const RR_RELAY = "RR_RELAY" //nolint:stylecheck
// RR_RPC env variable key (internal) if the RPC presents
-const RR_RPC = "RR_RPC" //nolint:golint,stylecheck
+const RR_RPC = "RR_RPC" //nolint:stylecheck
// Plugin manages worker
type Plugin struct {