diff options
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r-- | plugins/websockets/executor/executor.go | 66 |
1 files changed, 30 insertions, 36 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 5f904d26..664b4dfd 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,12 +7,12 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" ) type Response struct { @@ -22,14 +22,15 @@ type Response struct { type Executor struct { sync.Mutex + // raw ws connection conn *connection.Connection log logger.Logger // associated connection ID connID string - // map with the pubsub drivers - pubsub map[string]pubsub.PubSub + // subscriber drivers + sub pubsub.Subscriber actualTopics map[string]struct{} req *http.Request @@ -38,12 +39,12 @@ type Executor struct { // NewExecutor creates protected connection and starts command loop func NewExecutor(conn *connection.Connection, log logger.Logger, - connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor { + connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor { return &Executor{ conn: conn, connID: connID, log: log, - pubsub: pubsubs, + sub: sub, accessValidator: av, actualTopics: make(map[string]struct{}, 10), req: r, @@ -67,20 +68,20 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit err = json.Unmarshal(data, msg) if err != nil { - e.log.Error("error unmarshal message", "error", err) + e.log.Error("unmarshal message", "error", err) continue } // nil message, continue if msg == nil { - e.log.Warn("get nil message, skipping") + e.log.Warn("nil message, skipping") continue } switch msg.Command { // handle leave case commands.Join: - e.log.Debug("get join command", "msg", msg) + e.log.Debug("received join command", "msg", msg) val, err := e.accessValidator(e.req, msg.Topics...) if err != nil { @@ -95,13 +96,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, errJ := json.Marshal(resp) if errJ != nil { - e.log.Error("error marshal the body", "error", errJ) + e.log.Error("marshal the body", "error", errJ) return errors.E(op, fmt.Errorf("%v,%v", err, errJ)) } errW := e.conn.Write(packet) if errW != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", errW) + e.log.Error("write payload to the connection", "payload", packet, "error", errW) return errors.E(op, fmt.Errorf("%v,%v", err, errW)) } @@ -115,27 +116,25 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, err := json.Marshal(resp) if err != nil { - e.log.Error("error marshal the body", "error", err) + e.log.Error("marshal the body", "error", err) return errors.E(op, err) } err = e.conn.Write(packet) if err != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + e.log.Error("write payload to the connection", "payload", packet, "error", err) return errors.E(op, err) } // subscribe to the topic - if br, ok := e.pubsub[msg.Broker]; ok { - err = e.Set(br, msg.Topics) - if err != nil { - return errors.E(op, err) - } + err = e.Set(msg.Topics) + if err != nil { + return errors.E(op, err) } // handle leave case commands.Leave: - e.log.Debug("get leave command", "msg", msg) + e.log.Debug("received leave command", "msg", msg) // prepare response resp := &Response{ @@ -145,21 +144,19 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, err := json.Marshal(resp) if err != nil { - e.log.Error("error marshal the body", "error", err) + e.log.Error("marshal the body", "error", err) return errors.E(op, err) } err = e.conn.Write(packet) if err != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + e.log.Error("write payload to the connection", "payload", packet, "error", err) return errors.E(op, err) } - if br, ok := e.pubsub[msg.Broker]; ok { - err = e.Leave(br, msg.Topics) - if err != nil { - return errors.E(op, err) - } + err = e.Leave(msg.Topics) + if err != nil { + return errors.E(op, err) } case commands.Headers: @@ -170,13 +167,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit } } -func (e *Executor) Set(br pubsub.PubSub, topics []string) error { +func (e *Executor) Set(topics []string) error { // associate connection with topics - err := br.Subscribe(e.connID, topics...) + err := e.sub.Subscribe(e.connID, topics...) if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics - _ = br.Unsubscribe(e.connID, topics...) + _ = e.sub.Unsubscribe(e.connID, topics...) return err } @@ -188,11 +185,11 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error { return nil } -func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { +func (e *Executor) Leave(topics []string) error { // remove associated connections from the storage - err := br.Unsubscribe(e.connID, topics...) + err := e.sub.Unsubscribe(e.connID, topics...) if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) return err } @@ -207,10 +204,7 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { func (e *Executor) CleanUp() { // unsubscribe particular connection from the topics for topic := range e.actualTopics { - // here - for _, ps := range e.pubsub { - _ = ps.Unsubscribe(e.connID, topic) - } + _ = e.sub.Unsubscribe(e.connID, topic) } // clean up the actualTopics data |