diff options
26 files changed, 326 insertions, 86 deletions
diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index 82072675..24d839e5 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -13,6 +13,6 @@ jobs: - name: Run linter uses: golangci/golangci-lint-action@v2 # Action page: <https://github.com/golangci/golangci-lint-action> with: - version: v1.39 # without patch version + version: v1.41 # without patch version only-new-issues: false # show only new issues if it's a pull request args: --timeout=10m diff --git a/.golangci.yml b/.golangci.yml index bfc69f57..41e1d68f 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -14,8 +14,10 @@ output: linters-settings: govet: check-shadowing: true - golint: - min-confidence: 0.1 + revive: + confidence: 0.8 + errorCode: 0 + warningCode: 0 gocyclo: min-complexity: 15 godot: @@ -55,7 +57,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/ - gocritic # The most opinionated Go source code linter - gofmt # Gofmt checks whether code was gofmt-ed. By default this tool runs with -s option to check for code simplification - goimports # Goimports does everything that gofmt does. Additionally it checks unused imports - - golint # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes + - revive # Golint differs from gofmt. Gofmt reformats Go source code, whereas golint prints out style mistakes - goprintffuncname # Checks that printf-like functions are named with `f` at the end - gosec # Inspects source code for security problems - gosimple # Linter for Go source code that specializes in simplifying a code @@ -88,3 +90,4 @@ issues: - goconst - noctx - gosimple + - revive @@ -17,6 +17,7 @@ require ( github.com/gofiber/websocket/v2 v2.0.3 github.com/golang/mock v1.4.4 github.com/google/flatbuffers v1.12.1 + github.com/google/uuid v1.2.0 // indirect github.com/hashicorp/go-multierror v1.1.1 github.com/json-iterator/go v1.1.11 github.com/klauspost/compress v1.12.2 // indirect @@ -172,6 +172,8 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index d57cc95c..8c9d69b9 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -47,7 +47,7 @@ type StaticPool struct { allocator worker.Allocator // err_encoder is the default Exec error encoder - err_encoder ErrorEncoder //nolint:golint,stylecheck + err_encoder ErrorEncoder //nolint:stylecheck } // Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 40903db3..ca61dbc4 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -15,7 +15,7 @@ import ( const MB = 1024 * 1024 // NSEC_IN_SEC nanoseconds in second -const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck +const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck type Supervised interface { Pool diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index 4625b7a7..29fa3640 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package worker_watcher //nolint:stylecheck import ( "context" @@ -23,9 +23,9 @@ type Watcher interface { // Destroy destroys the underlying container Destroy(ctx context.Context) - // WorkersList return all container w/o removing it from internal storage + // List return all container w/o removing it from internal storage List() []worker.BaseProcess - // RemoveWorker remove worker from the container + // Remove will remove worker from the container Remove(wb worker.BaseProcess) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 5aec4ee6..557563ac 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -1,4 +1,4 @@ -package worker_watcher //nolint:golint,stylecheck +package worker_watcher //nolint:stylecheck import ( "context" @@ -11,7 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container" ) -// workerCreateFunc can be nil, but in that case, dead container will not be replaced +// NewSyncWorkerWatcher is a constructor for the Watcher func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ container: container.NewVector(numWorkers), @@ -215,7 +215,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } } -// Warning, this is O(n) operation, and it will return copy of the actual workers +// List - this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) List() []worker.BaseProcess { ww.RLock() defer ww.RUnlock() diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go index 03bf6510..5e7b7f20 100644 --- a/plugins/broadcast/config.go +++ b/plugins/broadcast/config.go @@ -3,16 +3,17 @@ package broadcast /* broadcast: ws-us-region-1: - subscriber: ws - path: "/ws" + subscriber: websockets + middleware: ["headers", "gzip"] # ???? + address: "localhost:53223" + path: "/ws" - driver: redis + storage: redis address: - 6379 db: 0 */ - // Config represents configuration for the ws plugin type Config struct { // Sections represent particular broadcast plugin section diff --git a/plugins/broadcast/doc/ws.drawio b/plugins/broadcast/doc/ws.drawio new file mode 100644 index 00000000..739b797a --- /dev/null +++ b/plugins/broadcast/doc/ws.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go index 3ed8b412..47c779b5 100644 --- a/plugins/broadcast/interface.go +++ b/plugins/broadcast/interface.go @@ -1,25 +1,29 @@ package broadcast -import "encoding/json" +import ( + "encoding/json" +) // Subscriber defines the ability to operate as message passing broker. type Subscriber interface { // Subscribe broker to one or multiple topics. - Subscribe(upstream chan *Message, topics ...string) error - - // SubscribePattern broker to pattern. - SubscribePattern(upstream chan *Message, pattern string) error - - // Unsubscribe broker from one or multiple topics. - Unsubscribe(upstream chan *Message, topics ...string) error - + Subscribe(topics ...string) error // UnsubscribePattern broker from pattern. - UnsubscribePattern(upstream chan *Message, pattern string) error + UnsubscribePattern(pattern string) error } +// Storage used to store patterns and topics type Storage interface { - Store(topics ...string) - StorePattern(pattern string) + // Store connection uuid associated with the provided topics + Store(uuid string, topics ...string) + // StorePattern stores pattern associated with the particular connection + StorePattern(uuid string, pattern string) + + // GetConnection returns connections for the particular pattern + GetConnection(pattern string) []string + + // Construct is a constructor for the storage according to the provided configuration key (broadcast.websocket for example) + Construct(key string) (Storage, error) } type Publisher interface { diff --git a/plugins/broadcast/memory/bst/bst.go b/plugins/broadcast/memory/bst/bst.go new file mode 100644 index 00000000..7d09a10f --- /dev/null +++ b/plugins/broadcast/memory/bst/bst.go @@ -0,0 +1,134 @@ +package bst + +// BST ... +type BST struct { + // registered topic, not unique + topic string + // associated connections with the topic + uuids map[string]struct{} + + // left and right subtrees + left *BST + right *BST +} + +func NewBST() Storage { + return &BST{} +} + +// Insert uuid to the topic +func (B *BST) Insert(uuid string, topic string) { + curr := B + + for { + if curr.topic == topic { + curr.uuids[uuid] = struct{}{} + return + } + // if topic less than curr topic + if curr.topic < topic { + if curr.left == nil { + curr.left = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + // move forward + curr = curr.left + } else { + if curr.right == nil { + curr.right = &BST{ + topic: topic, + uuids: map[string]struct{}{uuid: {}}, + } + return + } + + curr = curr.right + } + } +} + +func (B *BST) Get(topic string) map[string]struct{} { + curr := B + for curr != nil { + if curr.topic == topic { + return curr.uuids + } + if curr.topic < topic { + curr = curr.left + } + if curr.topic > topic { + curr = curr.right + } + } + + return nil +} + +func (B *BST) Remove(uuid string, topic string) { + B.removeHelper(uuid, topic, nil) +} + +func (B *BST) removeHelper(uuid string, topic string, parent *BST) { + curr := B + for curr != nil { + if topic < curr.topic { + parent = curr + curr = curr.left + } else if topic > curr.topic { + parent = curr + curr = curr.right + } else { + if len(curr.uuids) > 1 { + if _, ok := curr.uuids[uuid]; ok { + delete(curr.uuids, uuid) + return + } + } + + if curr.left != nil && curr.right != nil { + curr.topic, curr.uuids = curr.right.traverseForMinString() + curr.right.removeHelper(curr.topic, uuid, curr) + } else if parent == nil { + if curr.left != nil { + curr.topic = curr.left.topic + curr.uuids = curr.left.uuids + + curr.right = curr.left.right + curr.left = curr.left.left + } else if curr.right != nil { + curr.topic = curr.right.topic + curr.uuids = curr.right.uuids + + curr.left = curr.right.left + curr.right = curr.right.right + } else { + // single node tree + } + } else if parent.left == curr { + if curr.left != nil { + parent.left = curr.left + } else { + parent.left = curr.right + } + } else if parent.right == curr { + if curr.left != nil { + parent.right = curr.left + } else { + parent.right = curr.right + } + } + break + } + } +} + +//go:inline +func (B *BST) traverseForMinString() (string, map[string]struct{}) { + if B.left == nil { + return B.topic, B.uuids + } + return B.left.traverseForMinString() +} diff --git a/plugins/broadcast/memory/bst/bst_test.go b/plugins/broadcast/memory/bst/bst_test.go new file mode 100644 index 00000000..b5ad6c10 --- /dev/null +++ b/plugins/broadcast/memory/bst/bst_test.go @@ -0,0 +1,33 @@ +package bst + +import ( + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestNewBST(t *testing.T) { + g := NewBST() + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments2") + } + + for i := 0; i < 100; i++ { + g.Insert(uuid.NewString(), "comments3") + } + + exist := g.Get("comments") + assert.Len(t, exist, 100) + + exist2 := g.Get("comments2") + assert.Len(t, exist2, 100) + + exist3 := g.Get("comments3") + assert.Len(t, exist3, 100) +} diff --git a/plugins/broadcast/memory/bst/interface.go b/plugins/broadcast/memory/bst/interface.go new file mode 100644 index 00000000..ecf40414 --- /dev/null +++ b/plugins/broadcast/memory/bst/interface.go @@ -0,0 +1,11 @@ +package bst + +// Storage is general in-memory BST storage implementation +type Storage interface { + // Insert inserts to a vertex with topic ident connection uuid + Insert(uuid string, topic string) + // Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed + Remove(uuid, topic string) + // Get will return all connections associated with the topic + Get(topic string) map[string]struct{} +} diff --git a/plugins/broadcast/memory/driver.go b/plugins/broadcast/memory/driver.go index 2eb45c8e..80527e4b 100644 --- a/plugins/broadcast/memory/driver.go +++ b/plugins/broadcast/memory/driver.go @@ -1,39 +1,29 @@ package memory -import "github.com/spiral/roadrunner/v2/plugins/broadcast" +import ( + "github.com/spiral/roadrunner/v2/plugins/broadcast" +) type Driver struct { } -func NewInMemoryDriver() broadcast.Subscriber { +func NewInMemoryDriver() broadcast.Storage { b := &Driver{} return b } -func (d *Driver) Serve() error { +func (d *Driver) Store(uuid string, topics ...string) { panic("implement me") } -func (d *Driver) Stop() { +func (d *Driver) StorePattern(uuid string, pattern string) { panic("implement me") } -func (d *Driver) Subscribe(upstream chan *broadcast.Message, topics ...string) error { +func (d *Driver) GetConnection(pattern string) []string { panic("implement me") } -func (d *Driver) SubscribePattern(upstream chan *broadcast.Message, pattern string) error { - panic("implement me") -} - -func (d *Driver) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error { - panic("implement me") -} - -func (d *Driver) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error { - panic("implement me") -} - -func (d *Driver) Publish(messages ...*broadcast.Message) error { - panic("implement me") +func (d *Driver) Construct(key string) (broadcast.Storage, error) { + return nil, nil } diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 7ad9e2ae..156bea80 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -47,6 +47,12 @@ func (p *Plugin) Serve() chan error { return errCh } + if p.driver == nil { + // Or if no storage detected, use in-memory storage + errCh <- errors.E(op, errors.Str("no storage detected")) + return errCh + } + // start the underlying broker go func() { // err := p.broker.Serve() @@ -72,12 +78,16 @@ func (p *Plugin) Name() string { func (p *Plugin) Collects() []interface{} { return []interface{}{ - p.CollectBroker, + p.CollectSubscriber, } } -func (p *Plugin) CollectBroker(name endure.Named, broker Subscriber) { - p.broker = broker +func (p *Plugin) CollectSubscriber(name endure.Named, subscriber Subscriber) { + p.broker = subscriber +} + +func (p *Plugin) CollectStorage(name endure.Named, storage Storage) { + p.driver = storage } func (p *Plugin) RPC() interface{} { diff --git a/plugins/broadcast/redis/driver.go b/plugins/broadcast/redis/driver.go index 65a229e1..556d5f03 100644 --- a/plugins/broadcast/redis/driver.go +++ b/plugins/broadcast/redis/driver.go @@ -1 +1,29 @@ package redis + +import ( + "github.com/spiral/roadrunner/v2/plugins/broadcast" +) + +type Driver struct { +} + +func NewInMemoryDriver() broadcast.Storage { + b := &Driver{} + return b +} + +func (d *Driver) Store(uuid string, topics ...string) { + panic("implement me") +} + +func (d *Driver) StorePattern(uuid string, pattern string) { + panic("implement me") +} + +func (d *Driver) GetConnection(pattern string) []string { + panic("implement me") +} + +func (d *Driver) Construct(key string) (broadcast.Storage, error) { + panic("implement me") +} diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go new file mode 100644 index 00000000..cdff10da --- /dev/null +++ b/plugins/broadcast/ws/commands/leave.go @@ -0,0 +1 @@ +package commands diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go new file mode 100644 index 00000000..cdff10da --- /dev/null +++ b/plugins/broadcast/ws/commands/subscribe.go @@ -0,0 +1 @@ +package commands diff --git a/plugins/broadcast/ws/connection.go b/plugins/broadcast/ws/connection/connection.go index 9f7bf00e..cfb47e35 100644 --- a/plugins/broadcast/ws/connection.go +++ b/plugins/broadcast/ws/connection/connection.go @@ -1,4 +1,4 @@ -package ws +package connection import ( "sync" diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go index c9a97606..f075864b 100644 --- a/plugins/broadcast/ws/plugin.go +++ b/plugins/broadcast/ws/plugin.go @@ -8,10 +8,8 @@ import ( ) const ( - // RootPluginName = "broadcast" - // - PluginName = "websockets" + PluginName = "websockets" ) type Plugin struct { @@ -21,7 +19,6 @@ type Plugin struct { cfg config.Configurer } - func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("ws_plugin_init") @@ -36,20 +33,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (p *Plugin) Serve() chan error { - errCh := make(chan error) - - return errCh -} - -func (p *Plugin) Stop() error { - return nil -} - func (p *Plugin) Name() string { return PluginName } +// Provides Provide a ws implementation func (p *Plugin) Provides() []interface{} { return []interface{}{ p.Websocket, @@ -57,9 +45,10 @@ func (p *Plugin) Provides() []interface{} { } // Websocket method should provide the Subscriber implementation to the broadcast -func (p *Plugin) Websocket() (broadcast.Subscriber, error) { +func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) { const op = errors.Op("websocket_subscriber_provide") - ws, err := NewWSSubscriber() + // initialize subscriber with the storage + ws, err := NewWSSubscriber(storage) if err != nil { return nil, errors.E(op, err) } @@ -67,6 +56,4 @@ func (p *Plugin) Websocket() (broadcast.Subscriber, error) { return ws, nil } - - -func (p *Plugin) Available(){} +func (p *Plugin) Available() {} diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go index 2039cf95..660efdca 100644 --- a/plugins/broadcast/ws/subscriber.go +++ b/plugins/broadcast/ws/subscriber.go @@ -1,35 +1,50 @@ package ws -import "github.com/spiral/roadrunner/v2/plugins/broadcast" +import ( + "github.com/gofiber/fiber/v2" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection" +) type Subscriber struct { - connections map[string]*Connection - storage broadcast.Storage + connections map[string]*connection.Connection + storage broadcast.Storage } -func NewWSSubscriber() (broadcast.Subscriber, error) { - m := make(map[string]*Connection) +// config +// +func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) { + m := make(map[string]*connection.Connection) + + go func() { + app := fiber.New() + app.Use("/ws", wsMiddleware) + app.Listen(":8080") + }() + return &Subscriber{ connections: m, + storage: storage, }, nil } -func (s *Subscriber) Subscribe(upstream chan *broadcast.Message, topics ...string) error { +func (s *Subscriber) Subscribe(topics ...string) error { panic("implement me") - - - - } -func (s *Subscriber) SubscribePattern(upstream chan *broadcast.Message, pattern string) error { +func (s *Subscriber) SubscribePattern(pattern string) error { panic("implement me") } -func (s *Subscriber) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error { +func (s *Subscriber) Unsubscribe(topics ...string) error { panic("implement me") } -func (s *Subscriber) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error { +func (s *Subscriber) UnsubscribePattern(pattern string) error { panic("implement me") } + +func (s *Subscriber) Publish(messages ...*broadcast.Message) error { + s.storage.GetConnection(messages[9].Topic) + return nil +} diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go new file mode 100644 index 00000000..068ef9fb --- /dev/null +++ b/plugins/broadcast/ws/ws_middleware.go @@ -0,0 +1,13 @@ +package ws + +import ( + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" +) + +func wsMiddleware(c *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(c) { + return c.Next() + } + return fiber.ErrUpgradeRequired +} diff --git a/plugins/gzip/plugin.go b/plugins/gzip/plugin.go index 24b125fb..133704f8 100644 --- a/plugins/gzip/plugin.go +++ b/plugins/gzip/plugin.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/NYTimes/gziphandler" + "github.com/gofiber/fiber/v2" ) const PluginName = "gzip" @@ -21,6 +22,10 @@ func (g *Plugin) Middleware(next http.Handler) http.Handler { }) } +func (g *Plugin) FiberMiddleware(ctx fiber.Ctx) { + +} + // Available interface implementation func (g *Plugin) Available() {} diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go index 319915c5..9a609735 100644 --- a/plugins/kv/storage.go +++ b/plugins/kv/storage.go @@ -92,7 +92,7 @@ func (p *Plugin) Serve() chan error { // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, k) - // at this point we know, that driver field present in the cofiguration + // at this point we know, that driver field present in the configuration switch v.(map[string]interface{})[driver] { case memcached: if _, ok := p.drivers[memcached]; !ok { diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index ef77f7ab..aab9dcde 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -25,9 +25,9 @@ import ( const PluginName = "server" // RR_RELAY env variable key (internal) -const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck +const RR_RELAY = "RR_RELAY" //nolint:stylecheck // RR_RPC env variable key (internal) if the RPC presents -const RR_RPC = "RR_RPC" //nolint:golint,stylecheck +const RR_RPC = "RR_RPC" //nolint:stylecheck // Plugin manages worker type Plugin struct { |