From 76ff8d1c95e087749d559ee5a4f8f0348feafffa Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Tue, 5 Jun 2018 16:23:14 +0300 Subject: Cs and refactoring --- service/bus.go | 165 ---------------------------------------------------- service/config.go | 6 -- service/factory.go | 69 ---------------------- service/registry.go | 162 +++++++++++++++++++++++++++++++++++++++++++++++++++ service/rpc.go | 32 ---------- service/service.go | 9 --- 6 files changed, 162 insertions(+), 281 deletions(-) delete mode 100644 service/bus.go delete mode 100644 service/config.go delete mode 100644 service/factory.go create mode 100644 service/registry.go delete mode 100644 service/rpc.go delete mode 100644 service/service.go (limited to 'service') diff --git a/service/bus.go b/service/bus.go deleted file mode 100644 index 8bfb914c..00000000 --- a/service/bus.go +++ /dev/null @@ -1,165 +0,0 @@ -package service - -import ( - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/spiral/goridge" - "net/rpc" - "sync" -) - -const ( - rpcConfig = "rpc" -) - -var ( - dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") -) - -type Bus struct { - wg sync.WaitGroup - services []Service - enabled []Service - stop chan interface{} - rpc *rpc.Server - rpcConfig *RPCConfig -} - -func (b *Bus) Register(s Service) { - b.services = append(b.services, s) -} - -func (b *Bus) Services() []Service { - return b.services -} - -func (b *Bus) Configure(cfg Config) error { - if segment := cfg.Get(rpcConfig); segment == nil { - logrus.Warn("rpc: no config has been provided") - } else { - b.rpcConfig = &RPCConfig{} - if err := segment.Unmarshal(b.rpcConfig); err != nil { - return err - } - } - - b.enabled = make([]Service, 0) - - for _, s := range b.services { - segment := cfg.Get(s.Name()) - if segment == nil { - // no config has been provided for the service - logrus.Debugf("%s: no config has been provided", s.Name()) - continue - } - - if enable, err := s.Configure(segment); err != nil { - return err - } else if !enable { - continue - } - - b.enabled = append(b.enabled, s) - } - - return nil -} - -func (b *Bus) RCPClient() (*rpc.Client, error) { - if b.rpcConfig == nil { - return nil, errors.New("rpc is not configured") - } - - conn, err := b.rpcConfig.CreateDialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil -} - -func (b *Bus) Serve() { - b.rpc = rpc.NewServer() - - for _, s := range b.enabled { - // some services might provide net/rpc api for internal communications - if api := s.RPC(); api != nil { - b.rpc.RegisterName(s.Name(), api) - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - if err := s.Serve(); err != nil { - logrus.Errorf("%s.start: %s", s.Name(), err) - } - }() - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - logrus.Debug("rpc: started") - if err := b.serveRPC(); err != nil { - logrus.Errorf("rpc: %s", err) - } - }() - - b.wg.Wait() -} - -func (b *Bus) Stop() { - if err := b.stopRPC(); err != nil { - logrus.Errorf("rpc: ", err) - } - - for _, s := range b.enabled { - if err := s.Stop(); err != nil { - logrus.Errorf("%s.stop: %s", s.Name(), err) - } - } - - b.wg.Wait() -} - -func (b *Bus) serveRPC() error { - if b.rpcConfig == nil { - return nil - } - - b.stop = make(chan interface{}) - - ln, err := b.rpcConfig.CreateListener() - if err != nil { - return err - } - defer ln.Close() - - for { - select { - case <-b.stop: - b.rpc = nil - return nil - default: - conn, err := ln.Accept() - if err != nil { - continue - } - - go b.rpc.ServeCodec(goridge.NewCodec(conn)) - } - } - - return nil -} - -func (b *Bus) stopRPC() error { - if b.rpcConfig == nil { - return nil - } - - close(b.stop) - return nil -} diff --git a/service/config.go b/service/config.go deleted file mode 100644 index d5381376..00000000 --- a/service/config.go +++ /dev/null @@ -1,6 +0,0 @@ -package service - -type Config interface { - Get(key string) Config - Unmarshal(out interface{}) error -} diff --git a/service/factory.go b/service/factory.go deleted file mode 100644 index e4a599e6..00000000 --- a/service/factory.go +++ /dev/null @@ -1,69 +0,0 @@ -package service - -import ( - "github.com/spiral/roadrunner" - "net" - "os/exec" - "strings" - "time" -) - -type PoolConfig struct { - Command string - Relay string - - Number uint64 - MaxJobs uint64 - - Timeouts struct { - Allocate int - Destroy int - } -} - -func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) { - relays, terminator, err := f.relayFactory() - if err != nil { - terminator() - return nil, nil, err - } - - rr := roadrunner.NewServer(f.cmd(), relays) - if err := rr.Configure(f.rrConfig()); err != nil { - return nil, nil, err - } - - return rr, nil, nil -} - -func (f *PoolConfig) rrConfig() roadrunner.Config { - return roadrunner.Config{ - NumWorkers: f.Number, - MaxExecutions: f.MaxJobs, - AllocateTimeout: time.Second * time.Duration(f.Timeouts.Allocate), - DestroyTimeout: time.Second * time.Duration(f.Timeouts.Destroy), - } -} - -func (f *PoolConfig) cmd() func() *exec.Cmd { - cmd := strings.Split(f.Command, " ") - return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) } -} - -func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) { - if f.Relay == "pipes" || f.Relay == "pipe" { - return roadrunner.NewPipeFactory(), nil, nil - } - - dsn := strings.Split(f.Relay, "://") - if len(dsn) != 2 { - return nil, nil, dsnError - } - - ln, err := net.Listen(dsn[0], dsn[1]) - if err != nil { - return nil, nil, err - } - - return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil -} diff --git a/service/registry.go b/service/registry.go new file mode 100644 index 00000000..d4e2ff12 --- /dev/null +++ b/service/registry.go @@ -0,0 +1,162 @@ +package service + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "sync" +) + +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // Get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +// Registry controls all internal RR services and provides plugin based system. +type Registry interface { + // Register add new service to the registry under given name. + Register(name string, service Service) + + // Configure configures all underlying services with given configuration. + Configure(cfg Config) error + + // Check is Service has been registered and configured. + Has(service string) bool + + // Get returns Service instance by it's Name or nil if Service not found. Method must return only configured instance. + Get(service string) Service + + // Serve all configured services. Non blocking. + Serve() error + + // Stop all active services. + Stop() error +} + +// Service provides high level functionality for road runner Service. +type Service interface { + // WithConfig must return Service instance configured with the given environment. Must return error in case of + // misconfiguration, might return nil as Service if Service is not enabled. + WithConfig(cfg Config, reg Registry) (Service, error) + + // Serve serves Service. + Serve() error + + // Stop stop Service Service. + Stop() error +} + +type registry struct { + log logrus.FieldLogger + mu sync.Mutex + candidates []*entry + configured []*entry +} + +// entry creates association between service instance and given name. +type entry struct { + // Associated service name + Name string + + // Associated service instance + Service Service + + // Serving indicates that service is currently serving + Serving bool +} + +// NewRegistry creates new registry. +func NewRegistry(log logrus.FieldLogger) Registry { + return ®istry{ + log: log, + candidates: make([]*entry, 0), + } +} + +// Register add new service to the registry under given name. +func (r *registry) Register(name string, service Service) { + r.mu.Lock() + defer r.mu.Unlock() + + r.candidates = append(r.candidates, &entry{ + Name: name, + Service: service, + Serving: false, + }) + + r.log.Debugf("%s.service: registered", name) +} + +// Configure configures all underlying services with given configuration. +func (r *registry) Configure(cfg Config) error { + if r.configured != nil { + return fmt.Errorf("service bus has been already configured") + } + + r.configured = make([]*entry, 0) + for _, e := range r.candidates { + segment := cfg.Get(e.Name) + if segment == nil { + r.log.Debugf("%s.service: no config has been provided", e.Name) + continue + } + + s, err := e.Service.WithConfig(segment, r) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("%s.service", e.Name)) + } + + if s != nil { + r.configured = append(r.configured, &entry{ + Name: e.Name, + Service: s, + Serving: false, + }) + } + } + + return nil +} + +// Check is Service has been registered. +func (r *registry) Has(service string) bool { + r.mu.Lock() + defer r.mu.Unlock() + + for _, e := range r.configured { + if e.Name == service { + return true + } + } + + return false +} + +// Get returns Service instance by it's Name or nil if Service not found. +func (r *registry) Get(service string) Service { + r.mu.Lock() + defer r.mu.Unlock() + + for _, e := range r.configured { + if e.Name == service { + return e.Service + } + } + + return nil +} + +// Serve all configured services. Non blocking. +func (r *registry) Serve() error { + return nil +} + +// Stop all active services. +func (r *registry) Stop() error { + return nil +} diff --git a/service/rpc.go b/service/rpc.go deleted file mode 100644 index eb128768..00000000 --- a/service/rpc.go +++ /dev/null @@ -1,32 +0,0 @@ -package service - -import ( - "net" - "strings" -) - -type RPCConfig struct { - Listen string -} - -func (cfg *RPCConfig) CreateListener() (net.Listener, error) { - dsn := strings.Split(cfg.Listen, "://") - if len(dsn) != 2 { - return nil, dsnError - } - - return net.Listen(dsn[0], dsn[1]) -} - -func (cfg *RPCConfig) CreateDialer() (net.Conn, error) { - dsn := strings.Split(cfg.Listen, "://") - if len(dsn) != 2 { - return nil, dsnError - } - - return net.Dial(dsn[0], dsn[1]) -} - -func NewBus() *Bus { - return &Bus{services: make([]Service, 0)} -} diff --git a/service/service.go b/service/service.go deleted file mode 100644 index 2f704657..00000000 --- a/service/service.go +++ /dev/null @@ -1,9 +0,0 @@ -package service - -type Service interface { - Name() string - Configure(cfg Config) (bool, error) - RPC() interface{} - Serve() error - Stop() error -} -- cgit v1.2.3