1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
package memory
import (
"errors"
"sync/atomic"
)
// Memory manages broadcasting in memory.
type Memory struct {
router *Router
messages chan *Message
join, leave chan subscriber
stop chan interface{}
stopped int32
}
// memoryBroker creates new memory based message broker.
func memoryBroker() *Memory {
return &Memory{
router: NewRouter(),
messages: make(chan *Message),
join: make(chan subscriber),
leave: make(chan subscriber),
stop: make(chan interface{}),
stopped: 0,
}
}
// Serve serves broker.
func (m *Memory) Serve() error {
for {
select {
case ctx := <-m.join:
ctx.done <- m.handleJoin(ctx)
case ctx := <-m.leave:
ctx.done <- m.handleLeave(ctx)
case msg := <-m.messages:
m.router.Dispatch(msg)
case <-m.stop:
return nil
}
}
}
func (m *Memory) handleJoin(sub subscriber) (err error) {
if sub.pattern != "" {
_, err = m.router.SubscribePattern(sub.upstream, sub.pattern)
return err
}
m.router.Subscribe(sub.upstream, sub.topics...)
return nil
}
func (m *Memory) handleLeave(sub subscriber) error {
if sub.pattern != "" {
m.router.UnsubscribePattern(sub.upstream, sub.pattern)
return nil
}
m.router.Unsubscribe(sub.upstream, sub.topics...)
return nil
}
// Stop closes the consumption and disconnects broker.
func (m *Memory) Stop() {
if atomic.CompareAndSwapInt32(&m.stopped, 0, 1) {
close(m.stop)
}
}
// Subscribe broker to one or multiple channels.
func (m *Memory) Subscribe(upstream chan *Message, topics ...string) error {
if atomic.LoadInt32(&m.stopped) == 1 {
return errors.New("broker has been stopped")
}
ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)}
m.join <- ctx
return <-ctx.done
}
// SubscribePattern broker to pattern.
func (m *Memory) SubscribePattern(upstream chan *Message, pattern string) error {
if atomic.LoadInt32(&m.stopped) == 1 {
return errors.New("broker has been stopped")
}
ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)}
m.join <- ctx
return <-ctx.done
}
// Unsubscribe broker from one or multiple channels.
func (m *Memory) Unsubscribe(upstream chan *Message, topics ...string) error {
if atomic.LoadInt32(&m.stopped) == 1 {
return errors.New("broker has been stopped")
}
ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)}
m.leave <- ctx
return <-ctx.done
}
// UnsubscribePattern broker from pattern.
func (m *Memory) UnsubscribePattern(upstream chan *Message, pattern string) error {
if atomic.LoadInt32(&m.stopped) == 1 {
return errors.New("broker has been stopped")
}
ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)}
m.leave <- ctx
return <-ctx.done
}
// Publish one or multiple Channel.
func (m *Memory) Publish(messages ...*Message) error {
if atomic.LoadInt32(&m.stopped) == 1 {
return errors.New("broker has been stopped")
}
for _, msg := range messages {
m.messages <- msg
}
return nil
}
|