diff options
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r-- | plugins/websockets/executor/executor.go | 26 |
1 files changed, 10 insertions, 16 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 24ea19ce..69aad7d4 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" ) @@ -23,9 +22,8 @@ type Response struct { type Executor struct { sync.Mutex - conn *connection.Connection - storage *storage.Storage - log logger.Logger + conn *connection.Connection + log logger.Logger // associated connection ID connID string @@ -39,12 +37,11 @@ type Executor struct { } // NewExecutor creates protected connection and starts command loop -func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, +func NewExecutor(conn *connection.Connection, log logger.Logger, connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor { return &Executor{ conn: conn, connID: connID, - storage: bst, log: log, pubsub: pubsubs, accessValidator: av, @@ -175,16 +172,14 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit func (e *Executor) Set(br pubsub.PubSub, topics []string) error { // associate connection with topics - err := br.Subscribe(topics...) + err := br.Subscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics - _ = br.Unsubscribe(topics...) + _ = br.Unsubscribe(e.connID, topics...) return err } - e.storage.InsertMany(e.connID, topics) - // save topics for the connection for i := 0; i < len(topics); i++ { e.actualTopics[topics[i]] = struct{}{} @@ -195,8 +190,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error { func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { // remove associated connections from the storage - e.storage.RemoveMany(e.connID, topics) - err := br.Unsubscribe(topics...) + err := br.Unsubscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) return err @@ -211,15 +205,15 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { } func (e *Executor) CleanUp() { + // unsubscribe particular connection from the topics for topic := range e.actualTopics { - // remove from the bst - e.storage.Remove(e.connID, topic) - + // here for _, ps := range e.pubsub { - _ = ps.Unsubscribe(topic) + _ = ps.Unsubscribe(e.connID, topic) } } + // clean up the actualTopics data for k := range e.actualTopics { delete(e.actualTopics, k) } |