summaryrefslogtreecommitdiff
path: root/plugins/websockets/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r--plugins/websockets/executor/executor.go146
1 files changed, 146 insertions, 0 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
new file mode 100644
index 00000000..391c9a8c
--- /dev/null
+++ b/plugins/websockets/executor/executor.go
@@ -0,0 +1,146 @@
+package executor
+
+import (
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/commands"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type Response struct {
+ Topic string `json:"topic"`
+ Payload []string `json:"payload"`
+}
+
+type Executor struct {
+ conn *connection.Connection
+ storage *storage.Storage
+ log logger.Logger
+
+ // associated connection ID
+ connID string
+ pubsub pubsub.PubSub
+}
+
+// NewExecutor creates protected connection and starts command loop
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor {
+ return &Executor{
+ conn: conn,
+ connID: connID,
+ storage: bst,
+ log: log,
+ pubsub: pubsubs,
+ }
+}
+
+func (e *Executor) StartCommandLoop() error {
+ for {
+ mt, data, err := e.conn.Read()
+ if err != nil {
+ if mt == -1 {
+ return err
+ }
+
+ return err
+ }
+
+ msg := &pubsub.Msg{}
+
+ err = json.Unmarshal(data, msg)
+ if err != nil {
+ e.log.Error("error unmarshal message", "error", err)
+ continue
+ }
+
+ switch msg.Command() {
+ // handle leave
+ case commands.Join:
+ // TODO access validators model update
+ //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...)
+ //// validation error
+ //if err != nil {
+ // e.log.Error("validation error", "error", err)
+ //
+ // resp := &Response{
+ // Topic: "#join",
+ // Payload: msg.Topics(),
+ // }
+ //
+ // packet, err := json.Marshal(resp)
+ // if err != nil {
+ // e.log.Error("error marshal the body", "error", err)
+ // return err
+ // }
+ //
+ // err = e.conn.Write(websocket.BinaryMessage, packet)
+ // if err != nil {
+ // e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ // continue
+ // }
+ //
+ // continue
+ //}
+ // associate connection with topics
+ e.storage.Store(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@join",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ err = e.pubsub.Subscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ // handle leave
+ case commands.Leave:
+ // remove associated connections from the storage
+ e.storage.Remove(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@leave",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.pubsub.Unsubscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ case commands.Headers:
+
+ default:
+ e.log.Warn("unknown command", "command", msg.Command())
+ }
+ }
+}