From dc3c5455e5c9b32737a0620c8bdb8bda0226dba7 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 00:09:33 +0300 Subject: - Update all main abstractions - Desighn a new interfaces responsible for the whole PubSub - New plugin - websockets Signed-off-by: Valery Piashchynski --- plugins/websockets/rpc.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 plugins/websockets/rpc.go (limited to 'plugins/websockets/rpc.go') diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go new file mode 100644 index 00000000..0f0303b7 --- /dev/null +++ b/plugins/websockets/rpc.go @@ -0,0 +1,46 @@ +package websockets + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type rpc struct { + plugin *Plugin + log logger.Logger +} + +func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error { + const op = errors.Op("broadcast_publish") + + // publish to the registered broker + mi := make([]pubsub.Message, 0, len(msg)) + // golang can't convert slice in-place + // so, we need to convert it manually + for i := 0; i < len(msg); i++ { + mi = append(mi, msg[i]) + } + err := r.plugin.Publish(mi) + if err != nil { + *ok = false + return errors.E(op, err) + } + *ok = true + return nil +} + +func (r *rpc) PublishAsync(msg []*pubsub.Msg, ok *bool) error { + // publish to the registered broker + mi := make([]pubsub.Message, 0, len(msg)) + // golang can't convert slice in-place + // so, we need to convert it manually + for i := 0; i < len(msg); i++ { + mi = append(mi, msg[i]) + } + + r.plugin.PublishAsync(mi) + + *ok = true + return nil +} -- cgit v1.2.3 From e8bffc95989041bc00d70f18a637e51c26608b60 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 00:16:37 +0300 Subject: - Comments Signed-off-by: Valery Piashchynski --- plugins/websockets/rpc.go | 1 + 1 file changed, 1 insertion(+) (limited to 'plugins/websockets/rpc.go') diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 0f0303b7..2b3ae54e 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -6,6 +6,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) +// rpc collectors struct type rpc struct { plugin *Plugin log logger.Logger -- cgit v1.2.3 From 0a64bb2a71ddb6b0ee5861e255a20df1327aa099 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 28 May 2021 13:19:02 +0300 Subject: - Tests for the ws-redis, ws-memory Signed-off-by: Valery Piashchynski --- plugins/websockets/rpc.go | 1 + 1 file changed, 1 insertion(+) (limited to 'plugins/websockets/rpc.go') diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 2b3ae54e..f917bd53 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -14,6 +14,7 @@ type rpc struct { func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error { const op = errors.Op("broadcast_publish") + r.log.Debug("message published", "msg", msg) // publish to the registered broker mi := make([]pubsub.Message, 0, len(msg)) -- cgit v1.2.3 From fcda08498e8f914bbd0798da898818cd5d0e4348 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 29 May 2021 00:24:30 +0300 Subject: - Add new internal plugin - channel. Which used to deliver messages from the ws plugin to the http directly Signed-off-by: Valery Piashchynski --- plugins/websockets/rpc.go | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) (limited to 'plugins/websockets/rpc.go') diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index f917bd53..2fb0f1b9 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -12,18 +12,17 @@ type rpc struct { log logger.Logger } -func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error { +func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { const op = errors.Op("broadcast_publish") r.log.Debug("message published", "msg", msg) - // publish to the registered broker - mi := make([]pubsub.Message, 0, len(msg)) - // golang can't convert slice in-place - // so, we need to convert it manually - for i := 0; i < len(msg); i++ { - mi = append(mi, msg[i]) + // just return in case of nil message + if msg == nil { + *ok = true + return nil } - err := r.plugin.Publish(mi) + + err := r.plugin.Publish(msg) if err != nil { *ok = false return errors.E(op, err) @@ -32,16 +31,16 @@ func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error { return nil } -func (r *rpc) PublishAsync(msg []*pubsub.Msg, ok *bool) error { - // publish to the registered broker - mi := make([]pubsub.Message, 0, len(msg)) - // golang can't convert slice in-place - // so, we need to convert it manually - for i := 0; i < len(msg); i++ { - mi = append(mi, msg[i]) - } +func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error { + r.log.Debug("message published", "msg", msg) - r.plugin.PublishAsync(mi) + // just return in case of nil message + if msg == nil { + *ok = true + return nil + } + // publish to the registered broker + r.plugin.PublishAsync(msg) *ok = true return nil -- cgit v1.2.3