summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
Diffstat (limited to 'service')
-rw-r--r--service/bus.go165
-rw-r--r--service/config.go6
-rw-r--r--service/factory.go69
-rw-r--r--service/registry.go162
-rw-r--r--service/rpc.go32
-rw-r--r--service/service.go9
6 files changed, 162 insertions, 281 deletions
diff --git a/service/bus.go b/service/bus.go
deleted file mode 100644
index 8bfb914c..00000000
--- a/service/bus.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package service
-
-import (
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- "github.com/spiral/goridge"
- "net/rpc"
- "sync"
-)
-
-const (
- rpcConfig = "rpc"
-)
-
-var (
- dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
-)
-
-type Bus struct {
- wg sync.WaitGroup
- services []Service
- enabled []Service
- stop chan interface{}
- rpc *rpc.Server
- rpcConfig *RPCConfig
-}
-
-func (b *Bus) Register(s Service) {
- b.services = append(b.services, s)
-}
-
-func (b *Bus) Services() []Service {
- return b.services
-}
-
-func (b *Bus) Configure(cfg Config) error {
- if segment := cfg.Get(rpcConfig); segment == nil {
- logrus.Warn("rpc: no config has been provided")
- } else {
- b.rpcConfig = &RPCConfig{}
- if err := segment.Unmarshal(b.rpcConfig); err != nil {
- return err
- }
- }
-
- b.enabled = make([]Service, 0)
-
- for _, s := range b.services {
- segment := cfg.Get(s.Name())
- if segment == nil {
- // no config has been provided for the service
- logrus.Debugf("%s: no config has been provided", s.Name())
- continue
- }
-
- if enable, err := s.Configure(segment); err != nil {
- return err
- } else if !enable {
- continue
- }
-
- b.enabled = append(b.enabled, s)
- }
-
- return nil
-}
-
-func (b *Bus) RCPClient() (*rpc.Client, error) {
- if b.rpcConfig == nil {
- return nil, errors.New("rpc is not configured")
- }
-
- conn, err := b.rpcConfig.CreateDialer()
- if err != nil {
- return nil, err
- }
-
- return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
-}
-
-func (b *Bus) Serve() {
- b.rpc = rpc.NewServer()
-
- for _, s := range b.enabled {
- // some services might provide net/rpc api for internal communications
- if api := s.RPC(); api != nil {
- b.rpc.RegisterName(s.Name(), api)
- }
-
- b.wg.Add(1)
- go func() {
- defer b.wg.Done()
-
- if err := s.Serve(); err != nil {
- logrus.Errorf("%s.start: %s", s.Name(), err)
- }
- }()
- }
-
- b.wg.Add(1)
- go func() {
- defer b.wg.Done()
-
- logrus.Debug("rpc: started")
- if err := b.serveRPC(); err != nil {
- logrus.Errorf("rpc: %s", err)
- }
- }()
-
- b.wg.Wait()
-}
-
-func (b *Bus) Stop() {
- if err := b.stopRPC(); err != nil {
- logrus.Errorf("rpc: ", err)
- }
-
- for _, s := range b.enabled {
- if err := s.Stop(); err != nil {
- logrus.Errorf("%s.stop: %s", s.Name(), err)
- }
- }
-
- b.wg.Wait()
-}
-
-func (b *Bus) serveRPC() error {
- if b.rpcConfig == nil {
- return nil
- }
-
- b.stop = make(chan interface{})
-
- ln, err := b.rpcConfig.CreateListener()
- if err != nil {
- return err
- }
- defer ln.Close()
-
- for {
- select {
- case <-b.stop:
- b.rpc = nil
- return nil
- default:
- conn, err := ln.Accept()
- if err != nil {
- continue
- }
-
- go b.rpc.ServeCodec(goridge.NewCodec(conn))
- }
- }
-
- return nil
-}
-
-func (b *Bus) stopRPC() error {
- if b.rpcConfig == nil {
- return nil
- }
-
- close(b.stop)
- return nil
-}
diff --git a/service/config.go b/service/config.go
deleted file mode 100644
index d5381376..00000000
--- a/service/config.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package service
-
-type Config interface {
- Get(key string) Config
- Unmarshal(out interface{}) error
-}
diff --git a/service/factory.go b/service/factory.go
deleted file mode 100644
index e4a599e6..00000000
--- a/service/factory.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package service
-
-import (
- "github.com/spiral/roadrunner"
- "net"
- "os/exec"
- "strings"
- "time"
-)
-
-type PoolConfig struct {
- Command string
- Relay string
-
- Number uint64
- MaxJobs uint64
-
- Timeouts struct {
- Allocate int
- Destroy int
- }
-}
-
-func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) {
- relays, terminator, err := f.relayFactory()
- if err != nil {
- terminator()
- return nil, nil, err
- }
-
- rr := roadrunner.NewServer(f.cmd(), relays)
- if err := rr.Configure(f.rrConfig()); err != nil {
- return nil, nil, err
- }
-
- return rr, nil, nil
-}
-
-func (f *PoolConfig) rrConfig() roadrunner.Config {
- return roadrunner.Config{
- NumWorkers: f.Number,
- MaxExecutions: f.MaxJobs,
- AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate),
- DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy),
- }
-}
-
-func (f *PoolConfig) cmd() func() *exec.Cmd {
- cmd := strings.Split(f.Command, " ")
- return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) }
-}
-
-func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) {
- if f.Relay == "pipes" || f.Relay == "pipe" {
- return roadrunner.NewPipeFactory(), nil, nil
- }
-
- dsn := strings.Split(f.Relay, "://")
- if len(dsn) != 2 {
- return nil, nil, dsnError
- }
-
- ln, err := net.Listen(dsn[0], dsn[1])
- if err != nil {
- return nil, nil, err
- }
-
- return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil
-}
diff --git a/service/registry.go b/service/registry.go
new file mode 100644
index 00000000..d4e2ff12
--- /dev/null
+++ b/service/registry.go
@@ -0,0 +1,162 @@
+package service
+
+import (
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "sync"
+)
+
+// Config provides ability to slice configuration sections and unmarshal configuration data into
+// given structure.
+type Config interface {
+ // Get nested config section (sub-map), returns nil if section not found.
+ Get(service string) Config
+
+ // Unmarshal unmarshal config data into given struct.
+ Unmarshal(out interface{}) error
+}
+
+// Registry controls all internal RR services and provides plugin based system.
+type Registry interface {
+ // Register add new service to the registry under given name.
+ Register(name string, service Service)
+
+ // Configure configures all underlying services with given configuration.
+ Configure(cfg Config) error
+
+ // Check is Service has been registered and configured.
+ Has(service string) bool
+
+ // Get returns Service instance by it's Name or nil if Service not found. Method must return only configured instance.
+ Get(service string) Service
+
+ // Serve all configured services. Non blocking.
+ Serve() error
+
+ // Stop all active services.
+ Stop() error
+}
+
+// Service provides high level functionality for road runner Service.
+type Service interface {
+ // WithConfig must return Service instance configured with the given environment. Must return error in case of
+ // misconfiguration, might return nil as Service if Service is not enabled.
+ WithConfig(cfg Config, reg Registry) (Service, error)
+
+ // Serve serves Service.
+ Serve() error
+
+ // Stop stop Service Service.
+ Stop() error
+}
+
+type registry struct {
+ log logrus.FieldLogger
+ mu sync.Mutex
+ candidates []*entry
+ configured []*entry
+}
+
+// entry creates association between service instance and given name.
+type entry struct {
+ // Associated service name
+ Name string
+
+ // Associated service instance
+ Service Service
+
+ // Serving indicates that service is currently serving
+ Serving bool
+}
+
+// NewRegistry creates new registry.
+func NewRegistry(log logrus.FieldLogger) Registry {
+ return &registry{
+ log: log,
+ candidates: make([]*entry, 0),
+ }
+}
+
+// Register add new service to the registry under given name.
+func (r *registry) Register(name string, service Service) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ r.candidates = append(r.candidates, &entry{
+ Name: name,
+ Service: service,
+ Serving: false,
+ })
+
+ r.log.Debugf("%s.service: registered", name)
+}
+
+// Configure configures all underlying services with given configuration.
+func (r *registry) Configure(cfg Config) error {
+ if r.configured != nil {
+ return fmt.Errorf("service bus has been already configured")
+ }
+
+ r.configured = make([]*entry, 0)
+ for _, e := range r.candidates {
+ segment := cfg.Get(e.Name)
+ if segment == nil {
+ r.log.Debugf("%s.service: no config has been provided", e.Name)
+ continue
+ }
+
+ s, err := e.Service.WithConfig(segment, r)
+ if err != nil {
+ return errors.Wrap(err, fmt.Sprintf("%s.service", e.Name))
+ }
+
+ if s != nil {
+ r.configured = append(r.configured, &entry{
+ Name: e.Name,
+ Service: s,
+ Serving: false,
+ })
+ }
+ }
+
+ return nil
+}
+
+// Check is Service has been registered.
+func (r *registry) Has(service string) bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ for _, e := range r.configured {
+ if e.Name == service {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Get returns Service instance by it's Name or nil if Service not found.
+func (r *registry) Get(service string) Service {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ for _, e := range r.configured {
+ if e.Name == service {
+ return e.Service
+ }
+ }
+
+ return nil
+}
+
+// Serve all configured services. Non blocking.
+func (r *registry) Serve() error {
+ return nil
+}
+
+// Stop all active services.
+func (r *registry) Stop() error {
+ return nil
+}
diff --git a/service/rpc.go b/service/rpc.go
deleted file mode 100644
index eb128768..00000000
--- a/service/rpc.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package service
-
-import (
- "net"
- "strings"
-)
-
-type RPCConfig struct {
- Listen string
-}
-
-func (cfg *RPCConfig) CreateListener() (net.Listener, error) {
- dsn := strings.Split(cfg.Listen, "://")
- if len(dsn) != 2 {
- return nil, dsnError
- }
-
- return net.Listen(dsn[0], dsn[1])
-}
-
-func (cfg *RPCConfig) CreateDialer() (net.Conn, error) {
- dsn := strings.Split(cfg.Listen, "://")
- if len(dsn) != 2 {
- return nil, dsnError
- }
-
- return net.Dial(dsn[0], dsn[1])
-}
-
-func NewBus() *Bus {
- return &Bus{services: make([]Service, 0)}
-}
diff --git a/service/service.go b/service/service.go
deleted file mode 100644
index 2f704657..00000000
--- a/service/service.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package service
-
-type Service interface {
- Name() string
- Configure(cfg Config) (bool, error)
- RPC() interface{}
- Serve() error
- Stop() error
-}