summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-03 12:54:43 +0300
committerWolfy-J <[email protected]>2018-06-03 12:54:43 +0300
commit36ea77baa5a41de10bd604cd0e5b5b3cafaaeb64 (patch)
tree13ca8abd454a6668f490eec2e44b1520bd3953fe /service
parentb02611b7266589d888e054a1d2e4432ae370617d (diff)
service bus, http service, rpc bus, cli commands, new configs
Diffstat (limited to 'service')
-rw-r--r--service/bus.go165
-rw-r--r--service/config.go6
-rw-r--r--service/factory.go69
-rw-r--r--service/rpc.go32
-rw-r--r--service/service.go9
5 files changed, 281 insertions, 0 deletions
diff --git a/service/bus.go b/service/bus.go
new file mode 100644
index 00000000..40e73c31
--- /dev/null
+++ b/service/bus.go
@@ -0,0 +1,165 @@
+package service
+
+import (
+ "github.com/sirupsen/logrus"
+ "net/rpc"
+ "sync"
+ "github.com/spiral/goridge"
+ "github.com/pkg/errors"
+)
+
+const (
+ rpcConfig = "rpc"
+)
+
+var (
+ dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+)
+
+type Bus struct {
+ wg sync.WaitGroup
+ services []Service
+ enabled []Service
+ stop chan interface{}
+ rpc *rpc.Server
+ rpcConfig *RPCConfig
+}
+
+func (b *Bus) Register(s Service) {
+ b.services = append(b.services, s)
+}
+
+func (b *Bus) Services() []Service {
+ return b.services
+}
+
+func (b *Bus) Configure(cfg Config) error {
+ if segment := cfg.Get(rpcConfig); segment == nil {
+ logrus.Warn("rpc: no config has been provided")
+ } else {
+ b.rpcConfig = &RPCConfig{}
+ if err := segment.Unmarshal(b.rpcConfig); err != nil {
+ return err
+ }
+ }
+
+ b.enabled = make([]Service, 0)
+
+ for _, s := range b.services {
+ segment := cfg.Get(s.Name())
+ if segment == nil {
+ // no config has been provided for the service
+ logrus.Debugf("%s: no config has been provided", s.Name())
+ continue
+ }
+
+ if enable, err := s.Configure(segment); err != nil {
+ return err
+ } else if !enable {
+ continue
+ }
+
+ b.enabled = append(b.enabled, s)
+ }
+
+ return nil
+}
+
+func (b *Bus) RCPClient() (*rpc.Client, error) {
+ if b.rpcConfig == nil {
+ return nil, errors.New("rpc is not configured")
+ }
+
+ conn, err := b.rpcConfig.CreateDialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
+
+func (b *Bus) Serve() {
+ b.rpc = rpc.NewServer()
+
+ for _, s := range b.enabled {
+ // some services might provide net/rpc api for internal communications
+ if api := s.RPC(); api != nil {
+ b.rpc.RegisterName(s.Name(), api)
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ if err := s.Serve(); err != nil {
+ logrus.Errorf("%s.start: ", s.Name(), err)
+ }
+ }()
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ logrus.Debug("rpc: started")
+ if err := b.serveRPC(); err != nil {
+ logrus.Errorf("rpc: %s", err)
+ }
+ }()
+
+ b.wg.Wait()
+}
+
+func (b *Bus) Stop() {
+ if err := b.stopRPC(); err != nil {
+ logrus.Errorf("rpc: ", err)
+ }
+
+ for _, s := range b.enabled {
+ if err := s.Stop(); err != nil {
+ logrus.Errorf("%s.stop: ", s.Name(), err)
+ }
+ }
+
+ b.wg.Wait()
+}
+
+func (b *Bus) serveRPC() error {
+ if b.rpcConfig == nil {
+ return nil
+ }
+
+ b.stop = make(chan interface{})
+
+ ln, err := b.rpcConfig.CreateListener()
+ if err != nil {
+ return err
+ }
+ defer ln.Close()
+
+ for {
+ select {
+ case <-b.stop:
+ b.rpc = nil
+ return nil
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ go b.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+
+ return nil
+}
+
+func (b *Bus) stopRPC() error {
+ if b.rpcConfig == nil {
+ return nil
+ }
+
+ close(b.stop)
+ return nil
+}
diff --git a/service/config.go b/service/config.go
new file mode 100644
index 00000000..d5381376
--- /dev/null
+++ b/service/config.go
@@ -0,0 +1,6 @@
+package service
+
+type Config interface {
+ Get(key string) Config
+ Unmarshal(out interface{}) error
+}
diff --git a/service/factory.go b/service/factory.go
new file mode 100644
index 00000000..dbdebc4f
--- /dev/null
+++ b/service/factory.go
@@ -0,0 +1,69 @@
+package service
+
+import (
+ "github.com/spiral/roadrunner"
+ "time"
+ "os/exec"
+ "strings"
+ "net"
+)
+
+type PoolConfig struct {
+ Command string
+ Relay string
+
+ Number uint64
+ MaxJobs uint64
+
+ Timeouts struct {
+ Allocate int
+ Destroy int
+ }
+}
+
+func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) {
+ relays, terminator, err := f.relayFactory()
+ if err != nil {
+ terminator()
+ return nil, nil, err
+ }
+
+ rr := roadrunner.NewServer(f.cmd(), relays)
+ if err := rr.Configure(f.rrConfig()); err != nil {
+ return nil, nil, err
+ }
+
+ return rr, nil, nil
+}
+
+func (f *PoolConfig) rrConfig() roadrunner.Config {
+ return roadrunner.Config{
+ NumWorkers: f.Number,
+ MaxExecutions: f.MaxJobs,
+ AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate),
+ DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy),
+ }
+}
+
+func (f *PoolConfig) cmd() func() *exec.Cmd {
+ cmd := strings.Split(f.Command, " ")
+ return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) }
+}
+
+func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) {
+ if f.Relay == "pipes" || f.Relay == "pipe" {
+ return roadrunner.NewPipeFactory(), nil, nil
+ }
+
+ dsn := strings.Split(f.Relay, "://")
+ if len(dsn) != 2 {
+ return nil, nil, dsnError
+ }
+
+ ln, err := net.Listen(dsn[0], dsn[1])
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil
+}
diff --git a/service/rpc.go b/service/rpc.go
new file mode 100644
index 00000000..eb128768
--- /dev/null
+++ b/service/rpc.go
@@ -0,0 +1,32 @@
+package service
+
+import (
+ "net"
+ "strings"
+)
+
+type RPCConfig struct {
+ Listen string
+}
+
+func (cfg *RPCConfig) CreateListener() (net.Listener, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, dsnError
+ }
+
+ return net.Listen(dsn[0], dsn[1])
+}
+
+func (cfg *RPCConfig) CreateDialer() (net.Conn, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, dsnError
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
+
+func NewBus() *Bus {
+ return &Bus{services: make([]Service, 0)}
+}
diff --git a/service/service.go b/service/service.go
new file mode 100644
index 00000000..2f704657
--- /dev/null
+++ b/service/service.go
@@ -0,0 +1,9 @@
+package service
+
+type Service interface {
+ Name() string
+ Configure(cfg Config) (bool, error)
+ RPC() interface{}
+ Serve() error
+ Stop() error
+}