summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/amqp/conn.go
blob: be747776f7a8698318308409714fc9f74925e41d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package amqp

import (
	"fmt"
	"github.com/cenkalti/backoff/v4"
	"github.com/streadway/amqp"
	"sync"
	"time"
)

// manages set of AMQP channels
type chanPool struct {
	// timeout to backoff redial
	tout time.Duration
	url  string

	mu *sync.Mutex

	conn      *amqp.Connection
	channels  map[string]*channel
	wait      chan interface{}
	connected chan interface{}
}

// manages single channel
type channel struct {
	ch *amqp.Channel
	// todo unused
	//consumer string
	confirm chan amqp.Confirmation
	signal  chan error
}

// newConn creates new watched AMQP connection
func newConn(url string, tout time.Duration) (*chanPool, error) {
	conn, err := dial(url)
	if err != nil {
		return nil, err
	}

	cp := &chanPool{
		url:       url,
		tout:      tout,
		conn:      conn,
		mu:        &sync.Mutex{},
		channels:  make(map[string]*channel),
		wait:      make(chan interface{}),
		connected: make(chan interface{}),
	}

	close(cp.connected)
	go cp.watch()
	return cp, nil
}

// dial dials to AMQP.
func dial(url string) (*amqp.Connection, error) {
	return amqp.Dial(url)
}

// Close gracefully closes all underlying channels and connection.
func (cp *chanPool) Close() error {
	cp.mu.Lock()

	close(cp.wait)
	if cp.channels == nil {
		return fmt.Errorf("connection is dead")
	}

	// close all channels and consume
	var wg sync.WaitGroup
	for _, ch := range cp.channels {
		wg.Add(1)

		go func(ch *channel) {
			defer wg.Done()
			cp.closeChan(ch, nil)
		}(ch)
	}
	cp.mu.Unlock()

	wg.Wait()

	cp.mu.Lock()
	defer cp.mu.Unlock()

	if cp.conn != nil {
		return cp.conn.Close()
	}

	return nil
}

// waitConnected waits till connection is connected again or eventually closed.
// must only be invoked after connection error has been delivered to channel.signal.
func (cp *chanPool) waitConnected() chan interface{} {
	cp.mu.Lock()
	defer cp.mu.Unlock()

	return cp.connected
}

// watch manages connection state and reconnects if needed
func (cp *chanPool) watch() {
	for {
		select {
		case <-cp.wait:
			// connection has been closed
			return
			// here we are waiting for the errors from amqp connection
		case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)):
			cp.mu.Lock()
			// clear connected, since connections are dead
			cp.connected = make(chan interface{})

			// broadcast error to all consume to let them for the tryReconnect
			for _, ch := range cp.channels {
				ch.signal <- err
			}

			// disable channel allocation while server is dead
			cp.conn = nil
			cp.channels = nil

			// initialize the backoff
			expb := backoff.NewExponentialBackOff()
			expb.MaxInterval = cp.tout
			cp.mu.Unlock()

			// reconnect function
			reconnect := func() error {
				cp.mu.Lock()
				conn, err := dial(cp.url)
				if err != nil {
					// still failing
					fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error()))
					cp.mu.Unlock()
					return err
				}

				// TODO ADD LOGGING
				fmt.Println("------amqp successfully redialed------")

				// here we are reconnected
				// replace the connection
				cp.conn = conn
				// re-init the channels
				cp.channels = make(map[string]*channel)
				cp.mu.Unlock()
				return nil
			}

			// start backoff retry
			errb := backoff.Retry(reconnect, expb)
			if errb != nil {
				fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error()))
				// reconnection failed
				close(cp.connected)
				return
			}
			close(cp.connected)
		}
	}
}

// channel allocates new channel on amqp connection
func (cp *chanPool) channel(name string) (*channel, error) {
	cp.mu.Lock()
	dead := cp.conn == nil
	cp.mu.Unlock()

	if dead {
		// wait for connection restoration (doubled the timeout duration)
		select {
		case <-time.NewTimer(cp.tout * 2).C:
			return nil, fmt.Errorf("connection is dead")
		case <-cp.connected:
			// connected
		}
	}

	cp.mu.Lock()
	defer cp.mu.Unlock()

	if cp.conn == nil {
		return nil, fmt.Errorf("connection has been closed")
	}

	if ch, ok := cp.channels[name]; ok {
		return ch, nil
	}

	// we must create new channel
	ch, err := cp.conn.Channel()
	if err != nil {
		return nil, err
	}

	// Enable publish confirmations
	if err = ch.Confirm(false); err != nil {
		return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err)
	}

	// we expect that every allocated channel would have listener on signal
	// this is not true only in case of pure producing channels
	cp.channels[name] = &channel{
		ch:      ch,
		confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)),
		signal:  make(chan error, 1),
	}

	return cp.channels[name], nil
}

// closeChan gracefully closes and removes channel allocation.
func (cp *chanPool) closeChan(c *channel, err error) error {
	cp.mu.Lock()
	defer cp.mu.Unlock()

	go func() {
		c.signal <- nil
		c.ch.Close()
	}()

	for name, ch := range cp.channels {
		if ch == c {
			delete(cp.channels, name)
		}
	}

	return err
}