summaryrefslogtreecommitdiff
path: root/balancer.go
blob: 29fcd8d5c870564b2006fbfedd474a1429f4c527 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package roadrunner

import (
	"os/exec"
	"sync"
)

// Balancer provides ability to perform hot-swap between 2 worker pools.
type Balancer struct {
	mu   sync.Mutex // protects pool hot swapping
	pool *Pool      // pool to work for user commands
}

// Spawn initiates underlying pool of workers and replaced old one.
func (b *Balancer) Spawn(cmd func() *exec.Cmd, factory Factory, cfg Config) error {
	b.mu.Lock()
	defer b.mu.Unlock()

	var (
		err error
		old *Pool
	)

	old = b.pool
	if b.pool, err = NewPool(cmd, factory, cfg); err != nil {
		return err
	}

	if old != nil {
		go func() {
			old.Close()
		}()
	}

	return nil
}

// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is
// being destroyed.
func (b *Balancer) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
	b.mu.Lock()
	pool := b.pool
	b.mu.Unlock()

	return pool.Execute(payload, ctx)
}

// Workers return list of active workers.
func (b *Balancer) Workers() []*Worker {
	b.mu.Lock()
	defer b.mu.Unlock()

	return b.pool.Workers()
}

// Close closes underlying pool.
func (b *Balancer) Close() {
	b.mu.Lock()
	defer b.mu.Unlock()

	b.pool.Close()
	b.pool = nil
}