1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
package pool
import (
"sync"
"github.com/fasthttp/websocket"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
)
type WorkersPool struct {
storage *storage.Storage
connections *sync.Map
resPool sync.Pool
log logger.Logger
queue chan *pubsub.Message
exit chan struct{}
}
// NewWorkersPool constructs worker pool for the websocket connections
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *pubsub.Message, 100),
storage: storage,
log: log,
exit: make(chan struct{}),
}
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
// start 10 workers
for i := 0; i < 10; i++ {
wp.do()
}
return wp
}
func (wp *WorkersPool) Queue(msg *pubsub.Message) {
wp.queue <- msg
}
func (wp *WorkersPool) Stop() {
for i := 0; i < 10; i++ {
wp.exit <- struct{}{}
}
close(wp.exit)
}
func (wp *WorkersPool) put(res map[string]struct{}) {
// optimized
// https://go-review.googlesource.com/c/go/+/110055/
// not O(n), but O(1)
for k := range res {
delete(res, k)
}
}
func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
select {
case msg, ok := <-wp.queue:
if !ok {
return
}
// do not handle nil's
if msg == nil {
continue
}
if len(msg.Topics) == 0 {
continue
}
res := wp.get()
// get connections for the particular topic
wp.storage.GetByPtr(msg.Topics, res)
if len(res) == 0 {
wp.log.Info("no such topic", "topic", msg.Topics)
wp.put(res)
continue
}
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics)
continue
}
conn := c.(*connection.Connection)
err := conn.Write(websocket.BinaryMessage, msg.Payload)
if err != nil {
wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
wp.put(res)
continue
}
}
wp.put(res)
case <-wp.exit:
wp.log.Info("get exit signal, exiting from the workers pool")
return
}
}
}()
}
|