diff options
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r-- | plugins/websockets/executor/executor.go | 41 |
1 files changed, 17 insertions, 24 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 07f22043..799312ad 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,8 +7,8 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" @@ -28,8 +28,8 @@ type Executor struct { // associated connection ID connID string - // map with the pubsub drivers - pubsub map[string]pubsub.Subscriber + // subscriber drivers + sub pubsub.Subscriber actualTopics map[string]struct{} req *http.Request @@ -38,12 +38,12 @@ type Executor struct { // NewExecutor creates protected connection and starts command loop func NewExecutor(conn *connection.Connection, log logger.Logger, - connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor { + connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor { return &Executor{ conn: conn, connID: connID, log: log, - pubsub: pubsubs, + sub: sub, accessValidator: av, actualTopics: make(map[string]struct{}, 10), req: r, @@ -126,11 +126,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit } // subscribe to the topic - if br, ok := e.pubsub[msg.Broker]; ok { - err = e.Set(br, msg.Topics) - if err != nil { - return errors.E(op, err) - } + err = e.Set(msg.Topics) + if err != nil { + return errors.E(op, err) } // handle leave @@ -155,11 +153,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit return errors.E(op, err) } - if br, ok := e.pubsub[msg.Broker]; ok { - err = e.Leave(br, msg.Topics) - if err != nil { - return errors.E(op, err) - } + err = e.Leave(msg.Topics) + if err != nil { + return errors.E(op, err) } case commands.Headers: @@ -170,13 +166,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit } } -func (e *Executor) Set(br pubsub.Subscriber, topics []string) error { +func (e *Executor) Set(topics []string) error { // associate connection with topics - err := br.Subscribe(e.connID, topics...) + err := e.sub.Subscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics - _ = br.Unsubscribe(e.connID, topics...) + _ = e.sub.Unsubscribe(e.connID, topics...) return err } @@ -188,9 +184,9 @@ func (e *Executor) Set(br pubsub.Subscriber, topics []string) error { return nil } -func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error { +func (e *Executor) Leave(topics []string) error { // remove associated connections from the storage - err := br.Unsubscribe(e.connID, topics...) + err := e.sub.Unsubscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) return err @@ -207,10 +203,7 @@ func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error { func (e *Executor) CleanUp() { // unsubscribe particular connection from the topics for topic := range e.actualTopics { - // here - for _, ps := range e.pubsub { - _ = ps.Unsubscribe(e.connID, topic) - } + _ = e.sub.Unsubscribe(e.connID, topic) } // clean up the actualTopics data |