1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
package _____
import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"net/rpc"
"sync"
)
// Config provides ability to slice configuration sections and unmarshal configuration data into
// given structure.
type Config interface {
// Get nested config section (sub-map), returns nil if section not found.
Get(service string) Config
// Unmarshal unmarshal config data into given struct.
Unmarshal(out interface{}) error
}
var (
dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
)
type Bus struct {
services []Service
wg sync.WaitGroup
enabled []Service
stop chan interface{}
rpc *rpc.Server
rpcConfig *RPCConfig
}
func (b *Bus) Register(s Service) {
b.services = append(b.services, s)
}
func (b *Bus) Services() []Service {
return b.services
}
func (b *Bus) Configure(cfg Config) error {
b.enabled = make([]Service, 0)
for _, s := range b.services {
segment := cfg.Get(s.Name())
if segment == nil {
// no config has been provided for the Service
logrus.Debugf("%s: no config has been provided", s.Name())
continue
}
if enable, err := s.Configure(segment); err != nil {
return err
} else if !enable {
continue
}
b.enabled = append(b.enabled, s)
}
return nil
}
func (b *Bus) Serve() {
b.rpc = rpc.NewServer()
for _, s := range b.enabled {
// some candidates might provide net/rpc api for internal communications
if api := s.RPC(); api != nil {
b.rpc.RegisterName(s.Name(), api)
}
b.wg.Add(1)
go func() {
defer b.wg.Done()
if err := s.Serve(); err != nil {
logrus.Errorf("%s.start: %s", s.Name(), err)
}
}()
}
b.wg.Wait()
}
func (b *Bus) Stop() {
for _, s := range b.enabled {
if err := s.Stop(); err != nil {
logrus.Errorf("%s.stop: %s", s.Name(), err)
}
}
b.wg.Wait()
}
|