summaryrefslogtreecommitdiff
path: root/plugins/websockets/executor
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/executor')
-rw-r--r--plugins/websockets/executor/executor.go41
1 files changed, 17 insertions, 24 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 07f22043..799312ad 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,8 +7,8 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -28,8 +28,8 @@ type Executor struct {
// associated connection ID
connID string
- // map with the pubsub drivers
- pubsub map[string]pubsub.Subscriber
+ // subscriber drivers
+ sub pubsub.Subscriber
actualTopics map[string]struct{}
req *http.Request
@@ -38,12 +38,12 @@ type Executor struct {
// NewExecutor creates protected connection and starts command loop
func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
log: log,
- pubsub: pubsubs,
+ sub: sub,
accessValidator: av,
actualTopics: make(map[string]struct{}, 10),
req: r,
@@ -126,11 +126,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
// subscribe to the topic
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Set(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Set(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
// handle leave
@@ -155,11 +153,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Leave(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Leave(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
case commands.Headers:
@@ -170,13 +166,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
}
-func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
+func (e *Executor) Set(topics []string) error {
// associate connection with topics
- err := br.Subscribe(e.connID, topics...)
+ err := e.sub.Subscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
- _ = br.Unsubscribe(e.connID, topics...)
+ _ = e.sub.Unsubscribe(e.connID, topics...)
return err
}
@@ -188,9 +184,9 @@ func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
return nil
}
-func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
+func (e *Executor) Leave(topics []string) error {
// remove associated connections from the storage
- err := br.Unsubscribe(e.connID, topics...)
+ err := e.sub.Unsubscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
return err
@@ -207,10 +203,7 @@ func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
func (e *Executor) CleanUp() {
// unsubscribe particular connection from the topics
for topic := range e.actualTopics {
- // here
- for _, ps := range e.pubsub {
- _ = ps.Unsubscribe(e.connID, topic)
- }
+ _ = e.sub.Unsubscribe(e.connID, topic)
}
// clean up the actualTopics data