summaryrefslogtreecommitdiff
path: root/plugins/websockets/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r--plugins/websockets/executor/executor.go26
1 files changed, 10 insertions, 16 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 24ea19ce..69aad7d4 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -12,7 +12,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
)
@@ -23,9 +22,8 @@ type Response struct {
type Executor struct {
sync.Mutex
- conn *connection.Connection
- storage *storage.Storage
- log logger.Logger
+ conn *connection.Connection
+ log logger.Logger
// associated connection ID
connID string
@@ -39,12 +37,11 @@ type Executor struct {
}
// NewExecutor creates protected connection and starts command loop
-func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage,
+func NewExecutor(conn *connection.Connection, log logger.Logger,
connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
- storage: bst,
log: log,
pubsub: pubsubs,
accessValidator: av,
@@ -175,16 +172,14 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
// associate connection with topics
- err := br.Subscribe(topics...)
+ err := br.Subscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
- _ = br.Unsubscribe(topics...)
+ _ = br.Unsubscribe(e.connID, topics...)
return err
}
- e.storage.InsertMany(e.connID, topics)
-
// save topics for the connection
for i := 0; i < len(topics); i++ {
e.actualTopics[topics[i]] = struct{}{}
@@ -195,8 +190,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
// remove associated connections from the storage
- e.storage.RemoveMany(e.connID, topics)
- err := br.Unsubscribe(topics...)
+ err := br.Unsubscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
return err
@@ -211,15 +205,15 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
}
func (e *Executor) CleanUp() {
+ // unsubscribe particular connection from the topics
for topic := range e.actualTopics {
- // remove from the bst
- e.storage.Remove(e.connID, topic)
-
+ // here
for _, ps := range e.pubsub {
- _ = ps.Unsubscribe(topic)
+ _ = ps.Unsubscribe(e.connID, topic)
}
}
+ // clean up the actualTopics data
for k := range e.actualTopics {
delete(e.actualTopics, k)
}