summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
committerValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
commitd0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch)
tree7e6ec1a320f596b31f205caee5d5753eaa42f4ff
parent0323e070103cc2c30d2cdfb12719d753acafe151 (diff)
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/bst/bst.go16
-rw-r--r--pkg/bst/interface.go2
-rw-r--r--pkg/pubsub/interface.go13
-rw-r--r--plugins/http/plugin.go6
-rw-r--r--plugins/memory/plugin.go45
-rw-r--r--plugins/redis/fanin.go13
-rw-r--r--plugins/redis/plugin.go75
-rw-r--r--plugins/websockets/executor/executor.go26
-rw-r--r--plugins/websockets/plugin.go48
-rw-r--r--plugins/websockets/pool/workers_pool.go63
10 files changed, 229 insertions, 78 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go
index 664937ba..f8426b12 100644
--- a/pkg/bst/bst.go
+++ b/pkg/bst/bst.go
@@ -52,6 +52,22 @@ func (b *BST) Insert(uuid string, topic string) {
}
}
+func (b *BST) Contains(topic string) bool {
+ curr := b
+ for curr != nil {
+ switch {
+ case topic < curr.topic:
+ curr = curr.left
+ case topic > curr.topic:
+ curr = curr.right
+ case topic == curr.topic:
+ return true
+ }
+ }
+
+ return false
+}
+
func (b *BST) Get(topic string) map[string]struct{} {
curr := b
for curr != nil {
diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go
index ecf40414..95b03e11 100644
--- a/pkg/bst/interface.go
+++ b/pkg/bst/interface.go
@@ -8,4 +8,6 @@ type Storage interface {
Remove(uuid, topic string)
// Get will return all connections associated with the topic
Get(topic string) map[string]struct{}
+ // Contains checks if the BST contains a topic
+ Contains(topic string) bool
}
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index 18c1a80c..2d5d9595 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -2,6 +2,10 @@ package pubsub
import "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+/*
+This interface is in BETA. It might be changed.
+*/
+
// PubSub ...
type PubSub interface {
Publisher
@@ -10,15 +14,20 @@ type PubSub interface {
}
// Subscriber defines the ability to operate as message passing broker.
+// BETA interface
type Subscriber interface {
// Subscribe broker to one or multiple topics.
- Subscribe(topics ...string) error
+ Subscribe(connectionID string, topics ...string) error
// Unsubscribe from one or multiply topics
- Unsubscribe(topics ...string) error
+ Unsubscribe(connectionID string, topics ...string) error
+
+ // Connections returns all connections associated with the particular topic
+ Connections(topic string, ret map[string]struct{})
}
// Publisher publish one or more messages
+// BETA interface
type Publisher interface {
// Publish one or multiple Channel.
Publish(messages []byte) error
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 1952679a..ba83344a 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -248,6 +248,12 @@ func (p *Plugin) Stop() error {
// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ p.log.Error("body close", "error", err)
+ }
+ }()
if headerContainsUpgrade(r) {
http.Error(w, "server does not support upgrade header", http.StatusInternalServerError)
return
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index eb87b39e..6732ff5d 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -3,6 +3,7 @@ package memory
import (
"sync"
+ "github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
@@ -13,17 +14,19 @@ const (
)
type Plugin struct {
+ sync.RWMutex
log logger.Logger
// channel with the messages from the RPC
pushCh chan []byte
// user-subscribed topics
- topics sync.Map
+ storage bst.Storage
}
func (p *Plugin) Init(log logger.Logger) error {
p.log = log
- p.pushCh = make(chan []byte, 100)
+ p.pushCh = make(chan []byte, 10)
+ p.storage = bst.NewBST()
return nil
}
@@ -35,44 +38,62 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Publish(messages []byte) error {
- p.pushCh <- messages
+func (p *Plugin) Publish(message []byte) error {
+ p.pushCh <- message
return nil
}
-func (p *Plugin) PublishAsync(messages []byte) {
+func (p *Plugin) PublishAsync(message []byte) {
go func() {
- p.pushCh <- messages
+ p.pushCh <- message
}()
}
-func (p *Plugin) Subscribe(topics ...string) error {
+func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
for i := 0; i < len(topics); i++ {
- p.topics.Store(topics[i], struct{}{})
+ p.storage.Insert(connectionID, topics[i])
}
return nil
}
-func (p *Plugin) Unsubscribe(topics ...string) error {
+func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
+ p.Lock()
+ defer p.Unlock()
for i := 0; i < len(topics); i++ {
- p.topics.Delete(topics[i])
+ p.storage.Remove(connectionID, topics[i])
}
return nil
}
+func (p *Plugin) Connections(topic string, res map[string]struct{}) {
+ p.RLock()
+ defer p.RUnlock()
+
+ ret := p.storage.Get(topic)
+ for rr := range ret {
+ res[rr] = struct{}{}
+ }
+}
+
func (p *Plugin) Next() (*message.Message, error) {
msg := <-p.pushCh
-
if msg == nil {
return nil, nil
}
+ p.RLock()
+ defer p.RUnlock()
+
fbsMsg := message.GetRootAsMessage(msg, 0)
// push only messages, which are subscribed
// TODO better???
for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok {
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok {
return fbsMsg, nil
}
}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 3082f24f..321bfaaa 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -15,6 +15,7 @@ import (
type FanIn struct {
sync.Mutex
+ // redis client
client redis.UniversalClient
pubsub *redis.PubSub
@@ -26,7 +27,7 @@ type FanIn struct {
exit chan struct{}
}
-func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
+func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
out := make(chan *message.Message, 100)
fi := &FanIn{
out: out,
@@ -42,7 +43,7 @@ func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
return fi
}
-func (fi *FanIn) AddChannel(topics ...string) error {
+func (fi *FanIn) sub(topics ...string) error {
const op = errors.Op("fanin_addchannel")
err := fi.pubsub.Subscribe(context.Background(), topics...)
if err != nil {
@@ -71,22 +72,22 @@ func (fi *FanIn) read() {
}
}
-func (fi *FanIn) RemoveChannel(topics ...string) error {
+func (fi *FanIn) unsub(topic string) error {
const op = errors.Op("fanin_remove")
- err := fi.pubsub.Unsubscribe(context.Background(), topics...)
+ err := fi.pubsub.Unsubscribe(context.Background(), topic)
if err != nil {
return errors.E(op, err)
}
return nil
}
-func (fi *FanIn) Stop() error {
+func (fi *FanIn) stop() error {
fi.exit <- struct{}{}
close(fi.out)
close(fi.exit)
return nil
}
-func (fi *FanIn) Consume() <-chan *message.Message {
+func (fi *FanIn) consume() <-chan *message.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 5b9de5fc..7b5721f4 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -15,7 +15,7 @@ import (
const PluginName = "redis"
type Plugin struct {
- sync.Mutex
+ sync.RWMutex
// config for RR integration
cfg *Config
// logger
@@ -23,6 +23,7 @@ type Plugin struct {
// redis universal client
universalClient redis.UniversalClient
+ // fanIn implementation used to deliver messages from all channels to the single websocket point
fanin *FanIn
}
@@ -70,7 +71,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
})
// init fanin
- p.fanin = NewFanIn(p.universalClient, log)
+ p.fanin = newFanIn(p.universalClient, log)
return nil
}
@@ -82,7 +83,7 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) Stop() error {
const op = errors.Op("redis_plugin_stop")
- err := p.fanin.Stop()
+ err := p.fanin.stop()
if err != nil {
return errors.E(op, err)
}
@@ -132,15 +133,73 @@ func (p *Plugin) PublishAsync(msg []byte) {
}()
}
-func (p *Plugin) Subscribe(topics ...string) error {
- return p.fanin.AddChannel(topics...)
+func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
+ // just add a connection
+ for i := 0; i < len(topics); i++ {
+ // key - topic
+ // value - connectionID
+ hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID)
+ res, err := hset.Result()
+ if err != nil {
+ return err
+ }
+ if res == 0 {
+ p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i])
+ continue
+ }
+ }
+
+ // and subscribe after
+ return p.fanin.sub(topics...)
}
-func (p *Plugin) Unsubscribe(topics ...string) error {
- return p.fanin.RemoveChannel(topics...)
+func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
+ // Remove topics from the storage
+ for i := 0; i < len(topics); i++ {
+ srem := p.universalClient.SRem(context.Background(), topics[i], connectionID)
+ if srem.Err() != nil {
+ return srem.Err()
+ }
+ }
+
+ for i := 0; i < len(topics); i++ {
+ // if there are no such topics, we can safely unsubscribe from the redis
+ ssc := p.universalClient.SMembers(context.Background(), topics[i])
+ res, err := ssc.Result()
+ if err != nil {
+ return err
+ }
+
+ // if we have associated connections - skip
+ if len(res) > 0 {
+ continue
+ }
+
+ // else - unsubscribe
+ err = p.fanin.unsub(topics[i])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (p *Plugin) Connections(topic string, res map[string]struct{}) {
+ hget := p.universalClient.SMembersMap(context.Background(), topic)
+ r, err := hget.Result()
+ if err != nil {
+ panic(err)
+ }
+
+ // assighn connections
+ // res expected to be from the sync.Pool
+ for k := range r {
+ res[k] = struct{}{}
+ }
}
// Next return next message
func (p *Plugin) Next() (*message.Message, error) {
- return <-p.fanin.Consume(), nil
+ return <-p.fanin.consume(), nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 24ea19ce..69aad7d4 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -12,7 +12,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
)
@@ -23,9 +22,8 @@ type Response struct {
type Executor struct {
sync.Mutex
- conn *connection.Connection
- storage *storage.Storage
- log logger.Logger
+ conn *connection.Connection
+ log logger.Logger
// associated connection ID
connID string
@@ -39,12 +37,11 @@ type Executor struct {
}
// NewExecutor creates protected connection and starts command loop
-func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage,
+func NewExecutor(conn *connection.Connection, log logger.Logger,
connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
- storage: bst,
log: log,
pubsub: pubsubs,
accessValidator: av,
@@ -175,16 +172,14 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
// associate connection with topics
- err := br.Subscribe(topics...)
+ err := br.Subscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
- _ = br.Unsubscribe(topics...)
+ _ = br.Unsubscribe(e.connID, topics...)
return err
}
- e.storage.InsertMany(e.connID, topics)
-
// save topics for the connection
for i := 0; i < len(topics); i++ {
e.actualTopics[topics[i]] = struct{}{}
@@ -195,8 +190,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
// remove associated connections from the storage
- e.storage.RemoveMany(e.connID, topics)
- err := br.Unsubscribe(topics...)
+ err := br.Unsubscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
return err
@@ -211,15 +205,15 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
}
func (e *Executor) CleanUp() {
+ // unsubscribe particular connection from the topics
for topic := range e.actualTopics {
- // remove from the bst
- e.storage.Remove(e.connID, topic)
-
+ // here
for _, ps := range e.pubsub {
- _ = ps.Unsubscribe(topic)
+ _ = ps.Unsubscribe(e.connID, topic)
}
}
+ // clean up the actualTopics data
for k := range e.actualTopics {
delete(e.actualTopics, k)
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index fe55d30e..4c0edcad 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -24,7 +24,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -43,7 +42,6 @@ type Plugin struct {
// global connections map
connections sync.Map
- storage *storage.Storage
// GO workers pool
workersPool *pool.WorkersPool
@@ -73,10 +71,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.pubsubs = make(map[string]pubsub.PubSub)
p.log = log
- p.storage = storage.NewStorage()
- p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
+ ReadBufferSize: 1024,
+ WriteBufferSize: 1024,
}
p.serveExit = make(chan struct{})
p.server = server
@@ -107,6 +105,8 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
+ p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
+
// run all pubsubs drivers
for _, v := range p.pubsubs {
go func(ps pubsub.PubSub) {
@@ -133,8 +133,13 @@ func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
p.Lock()
+ if p.phpPool == nil {
+ p.Unlock()
+ return nil
+ }
p.phpPool.Destroy(context.Background())
p.Unlock()
+
return nil
}
@@ -206,27 +211,34 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
// store connection
p.connections.Store(connectionID, safeConn)
- defer func() {
- // close the connection on exit
- err = safeConn.Close()
- if err != nil {
- p.log.Error("connection close", "error", err)
- }
-
- // when exiting - delete the connection
- p.connections.Delete(connectionID)
- }()
-
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
- defer e.CleanUp()
err = e.StartCommandLoop()
if err != nil {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
+
+ // when exiting - delete the connection
+ p.connections.Delete(connectionID)
+
+ // remove connection from all topics from all pub-sub drivers
+ e.CleanUp()
+
+ err = r.Body.Close()
+ if err != nil {
+ p.log.Error("body close", "error", err)
+ }
+
+ // close the connection on exit
+ err = safeConn.Close()
+ if err != nil {
+ p.log.Error("connection close", "error", err)
+ }
+
+ safeConn = nil
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -325,8 +337,6 @@ func (p *Plugin) PublishAsync(m []byte) {
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
- p.RLock()
- defer p.RUnlock()
const op = errors.Op("access_validator")
p.log.Debug("validation", "topics", topics)
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 7fcc873b..544f3ede 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -1,20 +1,22 @@
package pool
import (
+ "bytes"
"sync"
"github.com/fasthttp/websocket"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
- storage *storage.Storage
+ storage map[string]pubsub.PubSub
connections *sync.Map
resPool sync.Pool
+ bPool sync.Pool
log logger.Logger
queue chan *message.Message
@@ -22,11 +24,11 @@ type WorkersPool struct {
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *message.Message, 100),
- storage: storage,
+ storage: pubsubs,
log: log,
exit: make(chan struct{}),
}
@@ -34,9 +36,12 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
+ wp.bPool.New = func() interface{} {
+ return new(bytes.Buffer)
+ }
// start 10 workers
- for i := 0; i < 10; i++ {
+ for i := 0; i < 50; i++ {
wp.do()
}
@@ -48,7 +53,7 @@ func (wp *WorkersPool) Queue(msg *message.Message) {
}
func (wp *WorkersPool) Stop() {
- for i := 0; i < 10; i++ {
+ for i := 0; i < 50; i++ {
wp.exit <- struct{}{}
}
@@ -68,6 +73,15 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
+func (wp *WorkersPool) putBytes(b *bytes.Buffer) {
+ b.Reset()
+ wp.bPool.Put(b)
+}
+
+func (wp *WorkersPool) getBytes() *bytes.Buffer {
+ return wp.bPool.Get().(*bytes.Buffer)
+}
+
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
@@ -83,13 +97,26 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if msg.TopicsLength() == 0 {
continue
}
+
+ br, ok := wp.storage[utils.AsString(msg.Broker())]
+ if !ok {
+ wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage)
+ continue
+ }
+
res := wp.get()
+ bb := wp.getBytes()
+
for i := 0; i < msg.TopicsLength(); i++ {
// get connections for the particular topic
- wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
+ br.Connections(utils.AsString(msg.Topics(i)), res)
}
+
if len(res) == 0 {
- wp.log.Info("no such topic", "topic", msg.Topics)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i)))
+ }
+ wp.putBytes(bb)
wp.put(res)
continue
}
@@ -97,24 +124,30 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ }
continue
}
conn := c.(*connection.Connection)
- // TODO sync pool for the bytes
- bb := make([]byte, msg.PayloadLength())
+
+ // put data into the bytes buffer
for i := 0; i < msg.PayloadLength(); i++ {
- bb[i] = byte(msg.Payload(i))
+ bb.WriteByte(byte(msg.Payload(i)))
}
- err := conn.Write(websocket.BinaryMessage, bb)
+ err := conn.Write(websocket.BinaryMessage, bb.Bytes())
if err != nil {
- wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
- wp.put(res)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ }
continue
}
}
+ // put bytes buffer back
+ wp.putBytes(bb)
+ // put map with results back
wp.put(res)
case <-wp.exit:
wp.log.Info("get exit signal, exiting from the workers pool")