diff options
-rw-r--r-- | cmd/_____/bus.go | 94 | ||||
-rw-r--r-- | cmd/_____/factory.go (renamed from service/factory.go) | 8 | ||||
-rw-r--r-- | cmd/_____/http/config.go (renamed from http/config.go) | 2 | ||||
-rw-r--r-- | cmd/_____/http/data.go (renamed from http/data.go) | 0 | ||||
-rw-r--r-- | cmd/_____/http/request.go (renamed from http/request.go) | 0 | ||||
-rw-r--r-- | cmd/_____/http/response.go (renamed from http/response.go) | 0 | ||||
-rw-r--r-- | cmd/_____/http/rpc.go (renamed from http/rpc.go) | 2 | ||||
-rw-r--r-- | cmd/_____/http/server.go (renamed from http/server.go) | 0 | ||||
-rw-r--r-- | cmd/_____/http/service.go (renamed from http/service.go) | 4 | ||||
-rw-r--r-- | cmd/_____/http/static.go (renamed from http/static.go) | 0 | ||||
-rw-r--r-- | cmd/_____/http/uploads.go (renamed from http/uploads.go) | 0 | ||||
-rw-r--r-- | cmd/_____/utils/size.go (renamed from utils/size.go) | 0 | ||||
-rw-r--r-- | cmd/_____/utils/workers.go (renamed from utils/workers.go) | 2 | ||||
-rw-r--r-- | cmd/_____/verbose.go (renamed from cmd/rr/utils/verbose.go) | 2 | ||||
-rw-r--r-- | cmd/rr/.rr.yaml | 3 | ||||
-rw-r--r-- | cmd/rr/cmd/root.go | 28 | ||||
-rw-r--r-- | cmd/rr/http/register.go | 23 | ||||
-rw-r--r-- | cmd/rr/http/reload.go | 28 | ||||
-rw-r--r-- | cmd/rr/http/workers.go | 59 | ||||
-rw-r--r-- | cmd/rr/main.go | 10 | ||||
-rw-r--r-- | cmd/rr/utils/config.go | 11 | ||||
-rw-r--r-- | rpc/config.go | 35 | ||||
-rw-r--r-- | rpc/service.go | 99 | ||||
-rw-r--r-- | service/bus.go | 165 | ||||
-rw-r--r-- | service/config.go | 6 | ||||
-rw-r--r-- | service/registry.go | 162 | ||||
-rw-r--r-- | service/rpc.go | 32 | ||||
-rw-r--r-- | service/service.go | 9 |
28 files changed, 490 insertions, 294 deletions
diff --git a/cmd/_____/bus.go b/cmd/_____/bus.go new file mode 100644 index 00000000..813a6c3b --- /dev/null +++ b/cmd/_____/bus.go @@ -0,0 +1,94 @@ +package _____ + +import ( + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "net/rpc" + "sync" +) + +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // Get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +var ( + dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") +) + +type Bus struct { + services []Service + wg sync.WaitGroup + enabled []Service + stop chan interface{} + rpc *rpc.Server + rpcConfig *RPCConfig +} + +func (b *Bus) Register(s Service) { + b.services = append(b.services, s) +} + +func (b *Bus) Services() []Service { + return b.services +} + +func (b *Bus) Configure(cfg Config) error { + b.enabled = make([]Service, 0) + + for _, s := range b.services { + segment := cfg.Get(s.Name()) + if segment == nil { + // no config has been provided for the Service + logrus.Debugf("%s: no config has been provided", s.Name()) + continue + } + + if enable, err := s.Configure(segment); err != nil { + return err + } else if !enable { + continue + } + + b.enabled = append(b.enabled, s) + } + + return nil +} + +func (b *Bus) Serve() { + b.rpc = rpc.NewServer() + + for _, s := range b.enabled { + // some candidates might provide net/rpc api for internal communications + if api := s.RPC(); api != nil { + b.rpc.RegisterName(s.Name(), api) + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + + if err := s.Serve(); err != nil { + logrus.Errorf("%s.start: %s", s.Name(), err) + } + }() + } + + b.wg.Wait() +} + +func (b *Bus) Stop() { + for _, s := range b.enabled { + if err := s.Stop(); err != nil { + logrus.Errorf("%s.stop: %s", s.Name(), err) + } + } + + b.wg.Wait() +} diff --git a/service/factory.go b/cmd/_____/factory.go index e4a599e6..8ecf90ca 100644 --- a/service/factory.go +++ b/cmd/_____/factory.go @@ -1,4 +1,4 @@ -package service +package _____ import ( "github.com/spiral/roadrunner" @@ -8,6 +8,7 @@ import ( "time" ) +// todo: move out type PoolConfig struct { Command string Relay string @@ -16,8 +17,9 @@ type PoolConfig struct { MaxJobs uint64 Timeouts struct { - Allocate int - Destroy int + Construct int + Allocate int + Destroy int } } diff --git a/http/config.go b/cmd/_____/http/config.go index 2a64dbab..54e39a7d 100644 --- a/http/config.go +++ b/cmd/_____/http/config.go @@ -3,7 +3,7 @@ package http import ( "fmt" "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/utils" + "github.com/spiral/roadrunner/cmd/_____/utils" "os" "path" "strings" diff --git a/http/data.go b/cmd/_____/http/data.go index e6b8344f..e6b8344f 100644 --- a/http/data.go +++ b/cmd/_____/http/data.go diff --git a/http/request.go b/cmd/_____/http/request.go index fd483744..fd483744 100644 --- a/http/request.go +++ b/cmd/_____/http/request.go diff --git a/http/response.go b/cmd/_____/http/response.go index 2736c4ab..2736c4ab 100644 --- a/http/response.go +++ b/cmd/_____/http/response.go diff --git a/http/rpc.go b/cmd/_____/http/rpc.go index 38db9a61..1bc8a06b 100644 --- a/http/rpc.go +++ b/cmd/_____/http/rpc.go @@ -2,7 +2,7 @@ package http import ( "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner/utils" + "github.com/spiral/roadrunner/cmd/_____/utils" "github.com/pkg/errors" ) diff --git a/http/server.go b/cmd/_____/http/server.go index db1f22ef..db1f22ef 100644 --- a/http/server.go +++ b/cmd/_____/http/server.go diff --git a/http/service.go b/cmd/_____/http/service.go index 5d45240b..008aeab8 100644 --- a/http/service.go +++ b/cmd/_____/http/service.go @@ -8,6 +8,8 @@ import ( "github.com/spiral/roadrunner" ) +const ServiceName = "http" + type Service struct { cfg *serviceConfig http *http.Server @@ -15,7 +17,7 @@ type Service struct { } func (s *Service) Name() string { - return "http" + return ServiceName } func (s *Service) Configure(cfg service.Config) (bool, error) { diff --git a/http/static.go b/cmd/_____/http/static.go index d7030c3f..d7030c3f 100644 --- a/http/static.go +++ b/cmd/_____/http/static.go diff --git a/http/uploads.go b/cmd/_____/http/uploads.go index 468e8a19..468e8a19 100644 --- a/http/uploads.go +++ b/cmd/_____/http/uploads.go diff --git a/utils/size.go b/cmd/_____/utils/size.go index 176cc9e1..176cc9e1 100644 --- a/utils/size.go +++ b/cmd/_____/utils/size.go diff --git a/utils/workers.go b/cmd/_____/utils/workers.go index 0c4f778f..1024b4c6 100644 --- a/utils/workers.go +++ b/cmd/_____/utils/workers.go @@ -34,4 +34,4 @@ func FetchWorkers(srv *roadrunner.Server) (result []Worker) { } return -} +}
\ No newline at end of file diff --git a/cmd/rr/utils/verbose.go b/cmd/_____/verbose.go index 43770f34..d0088b69 100644 --- a/cmd/rr/utils/verbose.go +++ b/cmd/_____/verbose.go @@ -1,4 +1,4 @@ -package utils +package _____ //if f.Verbose { // rr.Observe(func(event int, ctx interface{}) { diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml index 2717e187..6e3b689b 100644 --- a/cmd/rr/.rr.yaml +++ b/cmd/rr/.rr.yaml @@ -1,5 +1,8 @@ # rpc bus allows php application and external clients to talk to rr services. rpc: + # enable rpc server + enable: true + # rpc connection DSN. Supported TCP and Unix sockets. listen: tcp://127.0.0.1:6001 diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go index d0cac5ef..d54437f0 100644 --- a/cmd/rr/cmd/root.go +++ b/cmd/rr/cmd/root.go @@ -31,24 +31,29 @@ import ( // Service bus for all the commands. var ( - // Shared service bus. - Services = service.NewBus() + cfgFile string + verbose bool + + // Logger - shared logger. + Logger = logrus.New() + + // Services - shared service bus. + Services = service.NewRegistry(Logger) // CLI is application endpoint. CLI = &cobra.Command{ - Use: "rr", - Short: "RoadRunner, PHP application server", + Use: "rr", + SilenceErrors: true, + SilenceUsage: true, + Short: utils.Sprintf("<green>RoadRunner, PHP Application Server.</reset>"), } - - cfgFile string - verbose bool ) // Execute adds all child commands to the CLI command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the CLI. func Execute() { if err := CLI.Execute(); err != nil { - logrus.Error(err) + utils.Printf("Error: <red>%s</reset>\n", err) os.Exit(1) } } @@ -59,7 +64,7 @@ func init() { cobra.OnInitialize(func() { if verbose { - logrus.SetLevel(logrus.DebugLevel) + Logger.SetLevel(logrus.DebugLevel) } if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil { @@ -81,6 +86,7 @@ func initConfig(cfgFile string, path []string, name string) service.Config { for _, p := range path { cfg.AddConfigPath(p) } + cfg.SetConfigName(name) } @@ -89,9 +95,9 @@ func initConfig(cfgFile string, path []string, name string) service.Config { // If a cfg file is found, read it in. if err := cfg.ReadInConfig(); err != nil { - logrus.Warnf("config: %s", err) + Logger.Warnf("config: %s", err) return nil } - return &utils.ConfigWrapper{cfg} + return &utils.ViperWrapper{Viper: cfg} } diff --git a/cmd/rr/http/register.go b/cmd/rr/http/register.go deleted file mode 100644 index fb828578..00000000 --- a/cmd/rr/http/register.go +++ /dev/null @@ -1,23 +0,0 @@ -package http - -import ( - "github.com/spf13/cobra" - rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/http" -) - -func init() { - rr.Services.Register(&http.Service{}) - - rr.CLI.AddCommand(&cobra.Command{ - Use: "http:reload", - Short: "Reload RoadRunner worker pools for the HTTP service", - Run: reloadHandler, - }) - - rr.CLI.AddCommand(&cobra.Command{ - Use: "http:workers", - Short: "List workers associated with RoadRunner HTTP service", - Run: workersHandler, - }) -} diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go index 6cdba576..0fd3d7e9 100644 --- a/cmd/rr/http/reload.go +++ b/cmd/rr/http/reload.go @@ -23,22 +23,34 @@ package http import ( "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/sirupsen/logrus" + "github.com/go-errors/errors" + "github.com/spiral/roadrunner/rpc" ) -func reloadHandler(cmd *cobra.Command, args []string) { - client, err := rr.Services.RCPClient() +func init() { + rr.CLI.AddCommand(&cobra.Command{ + Use: "http:reload", + Short: "Reload RoadRunner worker pools for the HTTP service", + RunE: reloadHandler, + }) +} + +func reloadHandler(cmd *cobra.Command, args []string) error { + if !rr.Services.Has("rpc") { + return errors.New("RPC service is not configured") + } + + client, err := rr.Services.Get("rpc").(*rpc.Service).Client() if err != nil { - logrus.Error(err) - return + return err } defer client.Close() var r string if err := client.Call("http.Reset", true, &r); err != nil { - logrus.Error(err) - return + return err } - logrus.Info("restarting http worker pool") + rr.Logger.Info("http.service: restarting worker pool") + return nil } diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go index 13e8d21c..63ef0cce 100644 --- a/cmd/rr/http/workers.go +++ b/cmd/rr/http/workers.go @@ -21,38 +21,47 @@ package http import ( - "github.com/olekukonko/tablewriter" "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/http" - "os" - "strconv" - "github.com/sirupsen/logrus" + "errors" + "github.com/spiral/roadrunner/rpc" ) -func workersHandler(cmd *cobra.Command, args []string) { - client, err := rr.Services.RCPClient() - if err != nil { - logrus.Error(err) - return - } - defer client.Close() +func init() { + rr.CLI.AddCommand(&cobra.Command{ + Use: "http:workers", + Short: "List workers associated with RoadRunner HTTP service", + RunE: workersHandler, + }) +} - var r http.WorkerList - if err := client.Call("http.Workers", true, &r); err != nil { - panic(err) +func workersHandler(cmd *cobra.Command, args []string) error { + if !rr.Services.Has("rpc") { + return errors.New("RPC service is not configured") } - tw := tablewriter.NewWriter(os.Stdout) - tw.SetHeader([]string{"PID", "Status", "Num Execs"}) - - for _, w := range r.Workers { - tw.Append([]string{ - strconv.Itoa(w.Pid), - w.Status, - strconv.Itoa(int(w.NumExecs)), - }) + client, err := rr.Services.Get("rpc").(*rpc.Service).Client() + if err != nil { + return err } + defer client.Close() - tw.Render() + //var r http.WorkerList + //if err := client.Call("http.Workers", true, &r); err != nil { + // panic(err) + //} + // + //tw := tablewriter.NewWriter(os.Stdout) + //tw.SetHeader([]string{"PID", "Status", "Num Execs"}) + // + //for _, w := range r.Workers { + // tw.Append([]string{ + // strconv.Itoa(w.Pid), + // w.Status, + // strconv.Itoa(int(w.NumExecs)), + // }) + //} + // + //tw.Render() + return nil } diff --git a/cmd/rr/main.go b/cmd/rr/main.go index 9d6f685c..26f70fdd 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -23,13 +23,17 @@ package main import ( - "github.com/spiral/roadrunner/cmd/rr/cmd" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/rpc" - // service plugins + // cli plugins _ "github.com/spiral/roadrunner/cmd/rr/http" ) func main() { + // provides ability to make local connection to services + rr.Services.Register("rpc", new(rpc.Service)) + // you can register additional commands using cmd.CLI - cmd.Execute() + rr.Execute() } diff --git a/cmd/rr/utils/config.go b/cmd/rr/utils/config.go index e7e22b3a..452dd195 100644 --- a/cmd/rr/utils/config.go +++ b/cmd/rr/utils/config.go @@ -5,19 +5,22 @@ import ( "github.com/spiral/roadrunner/service" ) -type ConfigWrapper struct { +// ViperWrapper provides interface bridge between Viper configs and service.Config. +type ViperWrapper struct { Viper *viper.Viper } -func (w *ConfigWrapper) Get(key string) service.Config { +// Get nested config section (sub-map), returns nil if section not found. +func (w *ViperWrapper) Get(key string) service.Config { sub := w.Viper.Sub(key) if sub == nil { return nil } - return &ConfigWrapper{sub} + return &ViperWrapper{sub} } -func (w *ConfigWrapper) Unmarshal(out interface{}) error { +// Unmarshal unmarshal config data into given struct. +func (w *ViperWrapper) Unmarshal(out interface{}) error { return w.Viper.Unmarshal(out) } diff --git a/rpc/config.go b/rpc/config.go new file mode 100644 index 00000000..67dc1094 --- /dev/null +++ b/rpc/config.go @@ -0,0 +1,35 @@ +package rpc + +import ( + "errors" + "net" + "strings" +) + +type config struct { + // Indicates if RPC connection is enabled. + Enable bool + + // Listen string + Listen string +} + +// listener creates new rpc socket listener. +func (cfg *config) listener() (net.Listener, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") + } + + return net.Listen(dsn[0], dsn[1]) +} + +// dialer creates rpc socket dialer. +func (cfg *config) dialer() (net.Conn, error) { + dsn := strings.Split(cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") + } + + return net.Dial(dsn[0], dsn[1]) +} diff --git a/rpc/service.go b/rpc/service.go new file mode 100644 index 00000000..61a9a1a3 --- /dev/null +++ b/rpc/service.go @@ -0,0 +1,99 @@ +package rpc + +import ( + "errors" + "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" + "net/rpc" +) + +// Service is RPC service. +type Service struct { + cfg *config + stop chan interface{} + rpc *rpc.Server +} + +// WithConfig must return Service instance configured with the given environment. Must return error in case of +// misconfiguration, might return nil as Service if Service is not enabled. +func (s *Service) WithConfig(cfg service.Config, reg service.Registry) (service.Service, error) { + config := &config{} + if err := cfg.Unmarshal(config); err != nil { + return nil, err + } + + if !config.Enable { + return nil, nil + } + + return &Service{cfg: config, rpc: rpc.NewServer()}, nil +} + +// Serve serves Service. +func (s *Service) Serve() error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + s.stop = make(chan interface{}) + + ln, err := s.cfg.listener() + if err != nil { + return err + } + defer ln.Close() + + for { + select { + case <-s.stop: + return nil + default: + conn, err := ln.Accept() + if err != nil { + continue + } + + s.rpc.Accept(ln) + + go s.rpc.ServeCodec(goridge.NewCodec(conn)) + } + } + + return nil +} + +// Stop stop Service Service. +func (s *Service) Stop() error { + close(s.stop) + return nil +} + +// Register publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - exported method of exported type +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// It returns an error if the receiver is not an exported type or has +// no suitable methods. It also logs the error using package log. +func (s *Service) Register(name string, rcvr interface{}) error { + if s.rpc == nil { + return errors.New("RPC service is not configured") + } + + return s.rpc.RegisterName(name, rcvr) +} + +// Client creates new RPC client. +func (s *Service) Client() (*rpc.Client, error) { + if s.cfg == nil { + return nil, errors.New("RPC service is not configured") + } + + conn, err := s.cfg.dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil +} diff --git a/service/bus.go b/service/bus.go deleted file mode 100644 index 8bfb914c..00000000 --- a/service/bus.go +++ /dev/null @@ -1,165 +0,0 @@ -package service - -import ( - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "github.com/spiral/goridge" - "net/rpc" - "sync" -) - -const ( - rpcConfig = "rpc" -) - -var ( - dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") -) - -type Bus struct { - wg sync.WaitGroup - services []Service - enabled []Service - stop chan interface{} - rpc *rpc.Server - rpcConfig *RPCConfig -} - -func (b *Bus) Register(s Service) { - b.services = append(b.services, s) -} - -func (b *Bus) Services() []Service { - return b.services -} - -func (b *Bus) Configure(cfg Config) error { - if segment := cfg.Get(rpcConfig); segment == nil { - logrus.Warn("rpc: no config has been provided") - } else { - b.rpcConfig = &RPCConfig{} - if err := segment.Unmarshal(b.rpcConfig); err != nil { - return err - } - } - - b.enabled = make([]Service, 0) - - for _, s := range b.services { - segment := cfg.Get(s.Name()) - if segment == nil { - // no config has been provided for the service - logrus.Debugf("%s: no config has been provided", s.Name()) - continue - } - - if enable, err := s.Configure(segment); err != nil { - return err - } else if !enable { - continue - } - - b.enabled = append(b.enabled, s) - } - - return nil -} - -func (b *Bus) RCPClient() (*rpc.Client, error) { - if b.rpcConfig == nil { - return nil, errors.New("rpc is not configured") - } - - conn, err := b.rpcConfig.CreateDialer() - if err != nil { - return nil, err - } - - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil -} - -func (b *Bus) Serve() { - b.rpc = rpc.NewServer() - - for _, s := range b.enabled { - // some services might provide net/rpc api for internal communications - if api := s.RPC(); api != nil { - b.rpc.RegisterName(s.Name(), api) - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - if err := s.Serve(); err != nil { - logrus.Errorf("%s.start: %s", s.Name(), err) - } - }() - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - logrus.Debug("rpc: started") - if err := b.serveRPC(); err != nil { - logrus.Errorf("rpc: %s", err) - } - }() - - b.wg.Wait() -} - -func (b *Bus) Stop() { - if err := b.stopRPC(); err != nil { - logrus.Errorf("rpc: ", err) - } - - for _, s := range b.enabled { - if err := s.Stop(); err != nil { - logrus.Errorf("%s.stop: %s", s.Name(), err) - } - } - - b.wg.Wait() -} - -func (b *Bus) serveRPC() error { - if b.rpcConfig == nil { - return nil - } - - b.stop = make(chan interface{}) - - ln, err := b.rpcConfig.CreateListener() - if err != nil { - return err - } - defer ln.Close() - - for { - select { - case <-b.stop: - b.rpc = nil - return nil - default: - conn, err := ln.Accept() - if err != nil { - continue - } - - go b.rpc.ServeCodec(goridge.NewCodec(conn)) - } - } - - return nil -} - -func (b *Bus) stopRPC() error { - if b.rpcConfig == nil { - return nil - } - - close(b.stop) - return nil -} diff --git a/service/config.go b/service/config.go deleted file mode 100644 index d5381376..00000000 --- a/service/config.go +++ /dev/null @@ -1,6 +0,0 @@ -package service - -type Config interface { - Get(key string) Config - Unmarshal(out interface{}) error -} diff --git a/service/registry.go b/service/registry.go new file mode 100644 index 00000000..d4e2ff12 --- /dev/null +++ b/service/registry.go @@ -0,0 +1,162 @@ +package service + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "sync" +) + +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // Get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +// Registry controls all internal RR services and provides plugin based system. +type Registry interface { + // Register add new service to the registry under given name. + Register(name string, service Service) + + // Configure configures all underlying services with given configuration. + Configure(cfg Config) error + + // Check is Service has been registered and configured. + Has(service string) bool + + // Get returns Service instance by it's Name or nil if Service not found. Method must return only configured instance. + Get(service string) Service + + // Serve all configured services. Non blocking. + Serve() error + + // Stop all active services. + Stop() error +} + +// Service provides high level functionality for road runner Service. +type Service interface { + // WithConfig must return Service instance configured with the given environment. Must return error in case of + // misconfiguration, might return nil as Service if Service is not enabled. + WithConfig(cfg Config, reg Registry) (Service, error) + + // Serve serves Service. + Serve() error + + // Stop stop Service Service. + Stop() error +} + +type registry struct { + log logrus.FieldLogger + mu sync.Mutex + candidates []*entry + configured []*entry +} + +// entry creates association between service instance and given name. +type entry struct { + // Associated service name + Name string + + // Associated service instance + Service Service + + // Serving indicates that service is currently serving + Serving bool +} + +// NewRegistry creates new registry. +func NewRegistry(log logrus.FieldLogger) Registry { + return ®istry{ + log: log, + candidates: make([]*entry, 0), + } +} + +// Register add new service to the registry under given name. +func (r *registry) Register(name string, service Service) { + r.mu.Lock() + defer r.mu.Unlock() + + r.candidates = append(r.candidates, &entry{ + Name: name, + Service: service, + Serving: false, + }) + + r.log.Debugf("%s.service: registered", name) +} + +// Configure configures all underlying services with given configuration. +func (r *registry) Configure(cfg Config) error { + if r.configured != nil { + return fmt.Errorf("service bus has been already configured") + } + + r.configured = make([]*entry, 0) + for _, e := range r.candidates { + segment := cfg.Get(e.Name) + if segment == nil { + r.log.Debugf("%s.service: no config has been provided", e.Name) + continue + } + + s, err := e.Service.WithConfig(segment, r) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("%s.service", e.Name)) + } + + if s != nil { + r.configured = append(r.configured, &entry{ + Name: e.Name, + Service: s, + Serving: false, + }) + } + } + + return nil +} + +// Check is Service has been registered. +func (r *registry) Has(service string) bool { + r.mu.Lock() + defer r.mu.Unlock() + + for _, e := range r.configured { + if e.Name == service { + return true + } + } + + return false +} + +// Get returns Service instance by it's Name or nil if Service not found. +func (r *registry) Get(service string) Service { + r.mu.Lock() + defer r.mu.Unlock() + + for _, e := range r.configured { + if e.Name == service { + return e.Service + } + } + + return nil +} + +// Serve all configured services. Non blocking. +func (r *registry) Serve() error { + return nil +} + +// Stop all active services. +func (r *registry) Stop() error { + return nil +} diff --git a/service/rpc.go b/service/rpc.go deleted file mode 100644 index eb128768..00000000 --- a/service/rpc.go +++ /dev/null @@ -1,32 +0,0 @@ -package service - -import ( - "net" - "strings" -) - -type RPCConfig struct { - Listen string -} - -func (cfg *RPCConfig) CreateListener() (net.Listener, error) { - dsn := strings.Split(cfg.Listen, "://") - if len(dsn) != 2 { - return nil, dsnError - } - - return net.Listen(dsn[0], dsn[1]) -} - -func (cfg *RPCConfig) CreateDialer() (net.Conn, error) { - dsn := strings.Split(cfg.Listen, "://") - if len(dsn) != 2 { - return nil, dsnError - } - - return net.Dial(dsn[0], dsn[1]) -} - -func NewBus() *Bus { - return &Bus{services: make([]Service, 0)} -} diff --git a/service/service.go b/service/service.go deleted file mode 100644 index 2f704657..00000000 --- a/service/service.go +++ /dev/null @@ -1,9 +0,0 @@ -package service - -type Service interface { - Name() string - Configure(cfg Config) (bool, error) - RPC() interface{} - Serve() error - Stop() error -} |