summaryrefslogtreecommitdiff
path: root/plugins/broadcast/ws
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast/ws')
-rw-r--r--plugins/broadcast/ws/commands/leave.go1
-rw-r--r--plugins/broadcast/ws/commands/subscribe.go1
-rw-r--r--plugins/broadcast/ws/connection/connection.go (renamed from plugins/broadcast/ws/connection.go)2
-rw-r--r--plugins/broadcast/ws/plugin.go25
-rw-r--r--plugins/broadcast/ws/subscriber.go41
-rw-r--r--plugins/broadcast/ws/ws_middleware.go13
6 files changed, 50 insertions, 33 deletions
diff --git a/plugins/broadcast/ws/commands/leave.go b/plugins/broadcast/ws/commands/leave.go
new file mode 100644
index 00000000..cdff10da
--- /dev/null
+++ b/plugins/broadcast/ws/commands/leave.go
@@ -0,0 +1 @@
+package commands
diff --git a/plugins/broadcast/ws/commands/subscribe.go b/plugins/broadcast/ws/commands/subscribe.go
new file mode 100644
index 00000000..cdff10da
--- /dev/null
+++ b/plugins/broadcast/ws/commands/subscribe.go
@@ -0,0 +1 @@
+package commands
diff --git a/plugins/broadcast/ws/connection.go b/plugins/broadcast/ws/connection/connection.go
index 9f7bf00e..cfb47e35 100644
--- a/plugins/broadcast/ws/connection.go
+++ b/plugins/broadcast/ws/connection/connection.go
@@ -1,4 +1,4 @@
-package ws
+package connection
import (
"sync"
diff --git a/plugins/broadcast/ws/plugin.go b/plugins/broadcast/ws/plugin.go
index c9a97606..f075864b 100644
--- a/plugins/broadcast/ws/plugin.go
+++ b/plugins/broadcast/ws/plugin.go
@@ -8,10 +8,8 @@ import (
)
const (
- //
RootPluginName = "broadcast"
- //
- PluginName = "websockets"
+ PluginName = "websockets"
)
type Plugin struct {
@@ -21,7 +19,6 @@ type Plugin struct {
cfg config.Configurer
}
-
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("ws_plugin_init")
@@ -36,20 +33,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error {
- errCh := make(chan error)
-
- return errCh
-}
-
-func (p *Plugin) Stop() error {
- return nil
-}
-
func (p *Plugin) Name() string {
return PluginName
}
+// Provides Provide a ws implementation
func (p *Plugin) Provides() []interface{} {
return []interface{}{
p.Websocket,
@@ -57,9 +45,10 @@ func (p *Plugin) Provides() []interface{} {
}
// Websocket method should provide the Subscriber implementation to the broadcast
-func (p *Plugin) Websocket() (broadcast.Subscriber, error) {
+func (p *Plugin) Websocket(storage broadcast.Storage) (broadcast.Subscriber, error) {
const op = errors.Op("websocket_subscriber_provide")
- ws, err := NewWSSubscriber()
+ // initialize subscriber with the storage
+ ws, err := NewWSSubscriber(storage)
if err != nil {
return nil, errors.E(op, err)
}
@@ -67,6 +56,4 @@ func (p *Plugin) Websocket() (broadcast.Subscriber, error) {
return ws, nil
}
-
-
-func (p *Plugin) Available(){}
+func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/ws/subscriber.go b/plugins/broadcast/ws/subscriber.go
index 2039cf95..660efdca 100644
--- a/plugins/broadcast/ws/subscriber.go
+++ b/plugins/broadcast/ws/subscriber.go
@@ -1,35 +1,50 @@
package ws
-import "github.com/spiral/roadrunner/v2/plugins/broadcast"
+import (
+ "github.com/gofiber/fiber/v2"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast/ws/connection"
+)
type Subscriber struct {
- connections map[string]*Connection
- storage broadcast.Storage
+ connections map[string]*connection.Connection
+ storage broadcast.Storage
}
-func NewWSSubscriber() (broadcast.Subscriber, error) {
- m := make(map[string]*Connection)
+// config
+//
+func NewWSSubscriber(storage broadcast.Storage) (broadcast.Subscriber, error) {
+ m := make(map[string]*connection.Connection)
+
+ go func() {
+ app := fiber.New()
+ app.Use("/ws", wsMiddleware)
+ app.Listen(":8080")
+ }()
+
return &Subscriber{
connections: m,
+ storage: storage,
}, nil
}
-func (s *Subscriber) Subscribe(upstream chan *broadcast.Message, topics ...string) error {
+func (s *Subscriber) Subscribe(topics ...string) error {
panic("implement me")
-
-
-
-
}
-func (s *Subscriber) SubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+func (s *Subscriber) SubscribePattern(pattern string) error {
panic("implement me")
}
-func (s *Subscriber) Unsubscribe(upstream chan *broadcast.Message, topics ...string) error {
+func (s *Subscriber) Unsubscribe(topics ...string) error {
panic("implement me")
}
-func (s *Subscriber) UnsubscribePattern(upstream chan *broadcast.Message, pattern string) error {
+func (s *Subscriber) UnsubscribePattern(pattern string) error {
panic("implement me")
}
+
+func (s *Subscriber) Publish(messages ...*broadcast.Message) error {
+ s.storage.GetConnection(messages[9].Topic)
+ return nil
+}
diff --git a/plugins/broadcast/ws/ws_middleware.go b/plugins/broadcast/ws/ws_middleware.go
new file mode 100644
index 00000000..068ef9fb
--- /dev/null
+++ b/plugins/broadcast/ws/ws_middleware.go
@@ -0,0 +1,13 @@
+package ws
+
+import (
+ "github.com/gofiber/fiber/v2"
+ "github.com/gofiber/websocket/v2"
+)
+
+func wsMiddleware(c *fiber.Ctx) error {
+ if websocket.IsWebSocketUpgrade(c) {
+ return c.Next()
+ }
+ return fiber.ErrUpgradeRequired
+}