summaryrefslogtreecommitdiff
path: root/plugins/broadcast/websockets/conn_context.go
blob: f7d6283313ea2f0abe07ba3d284412cbf10fff46 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package websockets

import (
	"encoding/json"

	"github.com/gorilla/websocket"
)

// ConnContext carries information about websocket connection and it's topics.
type ConnContext struct {
	// Conn to the client.
	Conn *websocket.Conn

	// Topics contain list of currently subscribed topics.
	Topics []string

	// upstream to push messages into.
	upstream chan *broadcast.Message
}

// SendMessage message directly to the client.
func (ctx *ConnContext) SendMessage(topic string, payload interface{}) (err error) {
	msg := &broadcast.Message{Topic: topic}
	msg.Payload, err = json.Marshal(payload)

	if err == nil {
		ctx.upstream <- msg
	}

	return err
}

func (ctx *ConnContext) serve(errHandler func(err error, conn *websocket.Conn)) {
	for msg := range ctx.upstream {
		if err := ctx.Conn.WriteJSON(msg); err != nil {
			errHandler(err, ctx.Conn)
		}
	}
}

func (ctx *ConnContext) addTopics(topics ...string) {
	for _, topic := range topics {
		found := false
		for _, e := range ctx.Topics {
			if e == topic {
				found = true
				break
			}
		}

		if !found {
			ctx.Topics = append(ctx.Topics, topic)
		}
	}
}

func (ctx *ConnContext) dropTopic(topics ...string) {
	for _, topic := range topics {
		for i, e := range ctx.Topics {
			if e == topic {
				ctx.Topics[i] = ctx.Topics[len(ctx.Topics)-1]
				ctx.Topics = ctx.Topics[:len(ctx.Topics)-1]
			}
		}
	}
}