diff options
author | Wolfy-J <[email protected]> | 2018-06-03 12:54:43 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-03 12:54:43 +0300 |
commit | 36ea77baa5a41de10bd604cd0e5b5b3cafaaeb64 (patch) | |
tree | 13ca8abd454a6668f490eec2e44b1520bd3953fe /service | |
parent | b02611b7266589d888e054a1d2e4432ae370617d (diff) |
service bus, http service, rpc bus, cli commands, new configs
Diffstat (limited to 'service')
-rw-r--r-- | service/bus.go | 165 | ||||
-rw-r--r-- | service/config.go | 6 | ||||
-rw-r--r-- | service/factory.go | 69 | ||||
-rw-r--r-- | service/rpc.go | 32 | ||||
-rw-r--r-- | service/service.go | 9 |
5 files changed, 281 insertions, 0 deletions
diff --git a/service/bus.go b/service/bus.go new file mode 100644 index 00000000..40e73c31 --- /dev/null +++ b/service/bus.go @@ -0,0 +1,165 @@ +package service + +import ( + "github.com/sirupsen/logrus" + "net/rpc" + "sync" + "github.com/spiral/goridge" + "github.com/pkg/errors" +) + +const ( + rpcConfig = "rpc" +) + +var ( + dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") +) + +type Bus struct { + wg sync.WaitGroup + services []Service + enabled []Service + stop chan interface{} + rpc *rpc.Server + rpcConfig *RPCConfig +} + +func (b *Bus) Register(s Service) { + b.services = append(b.services, s) +} + +func (b *Bus) Services() []Service { + return b.services +} + +func (b *Bus) Configure(cfg Config) error { + if segment := cfg.Get(rpcConfig); segment == nil { + logrus.Warn("rpc: no config has been provided") + } else { + b.rpcConfig = &RPCConfig{} + if err := segment.Unmarshal(b.rpcConfig); err != nil { + return err + } + } + + b.enabled = make([]Service, 0) + + for _, s := range b.services { + segment := cfg.Get(s.Name()) + if segment == nil { + // no config has been provided for the service + logrus.Debugf("%s: no config has been provided", s.Name()) + continue + } + + if enable, err := s.Configure(segment); err != nil { + return err + } else if !enable { + continue + } + + b.enabled = append(b.enabled, s) + } + + return nil +} + +func (b *Bus) RCPClient() (*rpc.Client, error) { + if b.rpcConfig == nil { + return nil, errors.New("rpc is not configured") + } + + conn, err := b.rpcConfig.CreateDialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} + +func (b *Bus) Serve() { + b.rpc = rpc.NewServer() + + for _, s := range b.enabled { + // some services might provide net/rpc api for internal communications + if api := s.RPC(); api != nil { + b.rpc.RegisterName(s.Name(), api) + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + + if err := s.Serve(); err != nil { + logrus.Errorf("%s.start: ", s.Name(), err) + } + }() + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + + logrus.Debug("rpc: started") + if err := b.serveRPC(); err != nil { + logrus.Errorf("rpc: %s", err) + } + }() + + b.wg.Wait() +} + +func (b *Bus) Stop() { + if err := b.stopRPC(); err != nil { + logrus.Errorf("rpc: ", err) + } + + for _, s := range b.enabled { + if err := s.Stop(); err != nil { + logrus.Errorf("%s.stop: ", s.Name(), err) + } + } + + b.wg.Wait() +} + +func (b *Bus) serveRPC() error { + if b.rpcConfig == nil { + return nil + } + + b.stop = make(chan interface{}) + + ln, err := b.rpcConfig.CreateListener() + if err != nil { + return err + } + defer ln.Close() + + for { + select { + case <-b.stop: + b.rpc = nil + return nil + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + go b.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + + return nil +} + +func (b *Bus) stopRPC() error { + if b.rpcConfig == nil { + return nil + } + + close(b.stop) + return nil +} diff --git a/service/config.go b/service/config.go new file mode 100644 index 00000000..d5381376 --- /dev/null +++ b/service/config.go @@ -0,0 +1,6 @@ +package service + +type Config interface { + Get(key string) Config + Unmarshal(out interface{}) error +} diff --git a/service/factory.go b/service/factory.go new file mode 100644 index 00000000..dbdebc4f --- /dev/null +++ b/service/factory.go @@ -0,0 +1,69 @@ +package service + +import ( + "github.com/spiral/roadrunner" + "time" + "os/exec" + "strings" + "net" +) + +type PoolConfig struct { + Command string + Relay string + + Number uint64 + MaxJobs uint64 + + Timeouts struct { + Allocate int + Destroy int + } +} + +func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) { + relays, terminator, err := f.relayFactory() + if err != nil { + terminator() + return nil, nil, err + } + + rr := roadrunner.NewServer(f.cmd(), relays) + if err := rr.Configure(f.rrConfig()); err != nil { + return nil, nil, err + } + + return rr, nil, nil +} + +func (f *PoolConfig) rrConfig() roadrunner.Config { + return roadrunner.Config{ + NumWorkers: f.Number, + MaxExecutions: f.MaxJobs, + AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate), + DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy), + } +} + +func (f *PoolConfig) cmd() func() *exec.Cmd { + cmd := strings.Split(f.Command, " ") + return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) } +} + +func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) { + if f.Relay == "pipes" || f.Relay == "pipe" { + return roadrunner.NewPipeFactory(), nil, nil + } + + dsn := strings.Split(f.Relay, "://") + if len(dsn) != 2 { + return nil, nil, dsnError + } + + ln, err := net.Listen(dsn[0], dsn[1]) + if err != nil { + return nil, nil, err + } + + return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil +} diff --git a/service/rpc.go b/service/rpc.go new file mode 100644 index 00000000..eb128768 --- /dev/null +++ b/service/rpc.go @@ -0,0 +1,32 @@ +package service + +import ( + "net" + "strings" +) + +type RPCConfig struct { + Listen string +} + +func (cfg *RPCConfig) CreateListener() (net.Listener, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, dsnError + } + + return net.Listen(dsn[0], dsn[1]) +} + +func (cfg *RPCConfig) CreateDialer() (net.Conn, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, dsnError + } + + return net.Dial(dsn[0], dsn[1]) +} + +func NewBus() *Bus { + return &Bus{services: make([]Service, 0)} +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 00000000..2f704657 --- /dev/null +++ b/service/service.go @@ -0,0 +1,9 @@ +package service + +type Service interface { + Name() string + Configure(cfg Config) (bool, error) + RPC() interface{} + Serve() error + Stop() error +} |