blob: 29fcd8d5c870564b2006fbfedd474a1429f4c527 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
package roadrunner
import (
"os/exec"
"sync"
)
// Balancer provides ability to perform hot-swap between 2 worker pools.
type Balancer struct {
mu sync.Mutex // protects pool hot swapping
pool *Pool // pool to work for user commands
}
// Spawn initiates underlying pool of workers and replaced old one.
func (b *Balancer) Spawn(cmd func() *exec.Cmd, factory Factory, cfg Config) error {
b.mu.Lock()
defer b.mu.Unlock()
var (
err error
old *Pool
)
old = b.pool
if b.pool, err = NewPool(cmd, factory, cfg); err != nil {
return err
}
if old != nil {
go func() {
old.Close()
}()
}
return nil
}
// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is
// being destroyed.
func (b *Balancer) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
b.mu.Lock()
pool := b.pool
b.mu.Unlock()
return pool.Execute(payload, ctx)
}
// Workers return list of active workers.
func (b *Balancer) Workers() []*Worker {
b.mu.Lock()
defer b.mu.Unlock()
return b.pool.Workers()
}
// Close closes underlying pool.
func (b *Balancer) Close() {
b.mu.Lock()
defer b.mu.Unlock()
b.pool.Close()
b.pool = nil
}
|