summaryrefslogtreecommitdiff
path: root/plugins/websockets/pool/workers_pool.go
blob: 7fcc873b1497351748d15e6d5f8291c16c0c697e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package pool

import (
	"sync"

	"github.com/fasthttp/websocket"
	"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
	"github.com/spiral/roadrunner/v2/plugins/logger"
	"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
	"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
	"github.com/spiral/roadrunner/v2/utils"
)

type WorkersPool struct {
	storage     *storage.Storage
	connections *sync.Map
	resPool     sync.Pool
	log         logger.Logger

	queue chan *message.Message
	exit  chan struct{}
}

// NewWorkersPool constructs worker pool for the websocket connections
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
	wp := &WorkersPool{
		connections: connections,
		queue:       make(chan *message.Message, 100),
		storage:     storage,
		log:         log,
		exit:        make(chan struct{}),
	}

	wp.resPool.New = func() interface{} {
		return make(map[string]struct{}, 10)
	}

	// start 10 workers
	for i := 0; i < 10; i++ {
		wp.do()
	}

	return wp
}

func (wp *WorkersPool) Queue(msg *message.Message) {
	wp.queue <- msg
}

func (wp *WorkersPool) Stop() {
	for i := 0; i < 10; i++ {
		wp.exit <- struct{}{}
	}

	close(wp.exit)
}

func (wp *WorkersPool) put(res map[string]struct{}) {
	// optimized
	// https://go-review.googlesource.com/c/go/+/110055/
	// not O(n), but O(1)
	for k := range res {
		delete(res, k)
	}
}

func (wp *WorkersPool) get() map[string]struct{} {
	return wp.resPool.Get().(map[string]struct{})
}

func (wp *WorkersPool) do() { //nolint:gocognit
	go func() {
		for {
			select {
			case msg, ok := <-wp.queue:
				if !ok {
					return
				}
				_ = msg
				if msg == nil {
					continue
				}
				if msg.TopicsLength() == 0 {
					continue
				}
				res := wp.get()
				for i := 0; i < msg.TopicsLength(); i++ {
					// get connections for the particular topic
					wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
				}
				if len(res) == 0 {
					wp.log.Info("no such topic", "topic", msg.Topics)
					wp.put(res)
					continue
				}

				for i := range res {
					c, ok := wp.connections.Load(i)
					if !ok {
						wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics)
						continue
					}

					conn := c.(*connection.Connection)
					// TODO sync pool for the bytes
					bb := make([]byte, msg.PayloadLength())
					for i := 0; i < msg.PayloadLength(); i++ {
						bb[i] = byte(msg.Payload(i))
					}
					err := conn.Write(websocket.BinaryMessage, bb)
					if err != nil {
						wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
						wp.put(res)
						continue
					}
				}

				wp.put(res)
			case <-wp.exit:
				wp.log.Info("get exit signal, exiting from the workers pool")
				return
			}
		}
	}()
}