summaryrefslogtreecommitdiff
path: root/cmd/_____/bus.go
blob: 813a6c3b13611df4feb23a21127d73f229138388 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package _____

import (
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	"net/rpc"
	"sync"
)

// Config provides ability to slice configuration sections and unmarshal configuration data into
// given structure.
type Config interface {
	// Get nested config section (sub-map), returns nil if section not found.
	Get(service string) Config

	// Unmarshal unmarshal config data into given struct.
	Unmarshal(out interface{}) error
}

var (
	dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
)

type Bus struct {
	services  []Service
	wg        sync.WaitGroup
	enabled   []Service
	stop      chan interface{}
	rpc       *rpc.Server
	rpcConfig *RPCConfig
}

func (b *Bus) Register(s Service) {
	b.services = append(b.services, s)
}

func (b *Bus) Services() []Service {
	return b.services
}

func (b *Bus) Configure(cfg Config) error {
	b.enabled = make([]Service, 0)

	for _, s := range b.services {
		segment := cfg.Get(s.Name())
		if segment == nil {
			// no config has been provided for the Service
			logrus.Debugf("%s: no config has been provided", s.Name())
			continue
		}

		if enable, err := s.Configure(segment); err != nil {
			return err
		} else if !enable {
			continue
		}

		b.enabled = append(b.enabled, s)
	}

	return nil
}

func (b *Bus) Serve() {
	b.rpc = rpc.NewServer()

	for _, s := range b.enabled {
		// some candidates might provide net/rpc api for internal communications
		if api := s.RPC(); api != nil {
			b.rpc.RegisterName(s.Name(), api)
		}

		b.wg.Add(1)
		go func() {
			defer b.wg.Done()

			if err := s.Serve(); err != nil {
				logrus.Errorf("%s.start: %s", s.Name(), err)
			}
		}()
	}

	b.wg.Wait()
}

func (b *Bus) Stop() {
	for _, s := range b.enabled {
		if err := s.Stop(); err != nil {
			logrus.Errorf("%s.stop: %s", s.Name(), err)
		}
	}

	b.wg.Wait()
}