summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/_____/bus.go94
-rw-r--r--cmd/_____/factory.go (renamed from service/factory.go)8
-rw-r--r--cmd/_____/http/config.go (renamed from http/config.go)2
-rw-r--r--cmd/_____/http/data.go (renamed from http/data.go)0
-rw-r--r--cmd/_____/http/request.go (renamed from http/request.go)0
-rw-r--r--cmd/_____/http/response.go (renamed from http/response.go)0
-rw-r--r--cmd/_____/http/rpc.go (renamed from http/rpc.go)2
-rw-r--r--cmd/_____/http/server.go (renamed from http/server.go)0
-rw-r--r--cmd/_____/http/service.go (renamed from http/service.go)4
-rw-r--r--cmd/_____/http/static.go (renamed from http/static.go)0
-rw-r--r--cmd/_____/http/uploads.go (renamed from http/uploads.go)0
-rw-r--r--cmd/_____/utils/size.go (renamed from utils/size.go)0
-rw-r--r--cmd/_____/utils/workers.go (renamed from utils/workers.go)2
-rw-r--r--cmd/_____/verbose.go (renamed from cmd/rr/utils/verbose.go)2
-rw-r--r--cmd/rr/.rr.yaml3
-rw-r--r--cmd/rr/cmd/root.go28
-rw-r--r--cmd/rr/http/register.go23
-rw-r--r--cmd/rr/http/reload.go28
-rw-r--r--cmd/rr/http/workers.go59
-rw-r--r--cmd/rr/main.go10
-rw-r--r--cmd/rr/utils/config.go11
-rw-r--r--rpc/config.go35
-rw-r--r--rpc/service.go99
-rw-r--r--service/bus.go165
-rw-r--r--service/config.go6
-rw-r--r--service/registry.go162
-rw-r--r--service/rpc.go32
-rw-r--r--service/service.go9
28 files changed, 490 insertions, 294 deletions
diff --git a/cmd/_____/bus.go b/cmd/_____/bus.go
new file mode 100644
index 00000000..813a6c3b
--- /dev/null
+++ b/cmd/_____/bus.go
@@ -0,0 +1,94 @@
+package _____
+
+import (
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "net/rpc"
+ "sync"
+)
+
+// Config provides ability to slice configuration sections and unmarshal configuration data into
+// given structure.
+type Config interface {
+ // Get nested config section (sub-map), returns nil if section not found.
+ Get(service string) Config
+
+ // Unmarshal unmarshal config data into given struct.
+ Unmarshal(out interface{}) error
+}
+
+var (
+ dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+)
+
+type Bus struct {
+ services []Service
+ wg sync.WaitGroup
+ enabled []Service
+ stop chan interface{}
+ rpc *rpc.Server
+ rpcConfig *RPCConfig
+}
+
+func (b *Bus) Register(s Service) {
+ b.services = append(b.services, s)
+}
+
+func (b *Bus) Services() []Service {
+ return b.services
+}
+
+func (b *Bus) Configure(cfg Config) error {
+ b.enabled = make([]Service, 0)
+
+ for _, s := range b.services {
+ segment := cfg.Get(s.Name())
+ if segment == nil {
+ // no config has been provided for the Service
+ logrus.Debugf("%s: no config has been provided", s.Name())
+ continue
+ }
+
+ if enable, err := s.Configure(segment); err != nil {
+ return err
+ } else if !enable {
+ continue
+ }
+
+ b.enabled = append(b.enabled, s)
+ }
+
+ return nil
+}
+
+func (b *Bus) Serve() {
+ b.rpc = rpc.NewServer()
+
+ for _, s := range b.enabled {
+ // some candidates might provide net/rpc api for internal communications
+ if api := s.RPC(); api != nil {
+ b.rpc.RegisterName(s.Name(), api)
+ }
+
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+
+ if err := s.Serve(); err != nil {
+ logrus.Errorf("%s.start: %s", s.Name(), err)
+ }
+ }()
+ }
+
+ b.wg.Wait()
+}
+
+func (b *Bus) Stop() {
+ for _, s := range b.enabled {
+ if err := s.Stop(); err != nil {
+ logrus.Errorf("%s.stop: %s", s.Name(), err)
+ }
+ }
+
+ b.wg.Wait()
+}
diff --git a/service/factory.go b/cmd/_____/factory.go
index e4a599e6..8ecf90ca 100644
--- a/service/factory.go
+++ b/cmd/_____/factory.go
@@ -1,4 +1,4 @@
-package service
+package _____
import (
"github.com/spiral/roadrunner"
@@ -8,6 +8,7 @@ import (
"time"
)
+// todo: move out
type PoolConfig struct {
Command string
Relay string
@@ -16,8 +17,9 @@ type PoolConfig struct {
MaxJobs uint64
Timeouts struct {
- Allocate int
- Destroy int
+ Construct int
+ Allocate int
+ Destroy int
}
}
diff --git a/http/config.go b/cmd/_____/http/config.go
index 2a64dbab..54e39a7d 100644
--- a/http/config.go
+++ b/cmd/_____/http/config.go
@@ -3,7 +3,7 @@ package http
import (
"fmt"
"github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/utils"
+ "github.com/spiral/roadrunner/cmd/_____/utils"
"os"
"path"
"strings"
diff --git a/http/data.go b/cmd/_____/http/data.go
index e6b8344f..e6b8344f 100644
--- a/http/data.go
+++ b/cmd/_____/http/data.go
diff --git a/http/request.go b/cmd/_____/http/request.go
index fd483744..fd483744 100644
--- a/http/request.go
+++ b/cmd/_____/http/request.go
diff --git a/http/response.go b/cmd/_____/http/response.go
index 2736c4ab..2736c4ab 100644
--- a/http/response.go
+++ b/cmd/_____/http/response.go
diff --git a/http/rpc.go b/cmd/_____/http/rpc.go
index 38db9a61..1bc8a06b 100644
--- a/http/rpc.go
+++ b/cmd/_____/http/rpc.go
@@ -2,7 +2,7 @@ package http
import (
"github.com/sirupsen/logrus"
- "github.com/spiral/roadrunner/utils"
+ "github.com/spiral/roadrunner/cmd/_____/utils"
"github.com/pkg/errors"
)
diff --git a/http/server.go b/cmd/_____/http/server.go
index db1f22ef..db1f22ef 100644
--- a/http/server.go
+++ b/cmd/_____/http/server.go
diff --git a/http/service.go b/cmd/_____/http/service.go
index 5d45240b..008aeab8 100644
--- a/http/service.go
+++ b/cmd/_____/http/service.go
@@ -8,6 +8,8 @@ import (
"github.com/spiral/roadrunner"
)
+const ServiceName = "http"
+
type Service struct {
cfg *serviceConfig
http *http.Server
@@ -15,7 +17,7 @@ type Service struct {
}
func (s *Service) Name() string {
- return "http"
+ return ServiceName
}
func (s *Service) Configure(cfg service.Config) (bool, error) {
diff --git a/http/static.go b/cmd/_____/http/static.go
index d7030c3f..d7030c3f 100644
--- a/http/static.go
+++ b/cmd/_____/http/static.go
diff --git a/http/uploads.go b/cmd/_____/http/uploads.go
index 468e8a19..468e8a19 100644
--- a/http/uploads.go
+++ b/cmd/_____/http/uploads.go
diff --git a/utils/size.go b/cmd/_____/utils/size.go
index 176cc9e1..176cc9e1 100644
--- a/utils/size.go
+++ b/cmd/_____/utils/size.go
diff --git a/utils/workers.go b/cmd/_____/utils/workers.go
index 0c4f778f..1024b4c6 100644
--- a/utils/workers.go
+++ b/cmd/_____/utils/workers.go
@@ -34,4 +34,4 @@ func FetchWorkers(srv *roadrunner.Server) (result []Worker) {
}
return
-}
+} \ No newline at end of file
diff --git a/cmd/rr/utils/verbose.go b/cmd/_____/verbose.go
index 43770f34..d0088b69 100644
--- a/cmd/rr/utils/verbose.go
+++ b/cmd/_____/verbose.go
@@ -1,4 +1,4 @@
-package utils
+package _____
//if f.Verbose {
// rr.Observe(func(event int, ctx interface{}) {
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
index 2717e187..6e3b689b 100644
--- a/cmd/rr/.rr.yaml
+++ b/cmd/rr/.rr.yaml
@@ -1,5 +1,8 @@
# rpc bus allows php application and external clients to talk to rr services.
rpc:
+ # enable rpc server
+ enable: true
+
# rpc connection DSN. Supported TCP and Unix sockets.
listen: tcp://127.0.0.1:6001
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
index d0cac5ef..d54437f0 100644
--- a/cmd/rr/cmd/root.go
+++ b/cmd/rr/cmd/root.go
@@ -31,24 +31,29 @@ import (
// Service bus for all the commands.
var (
- // Shared service bus.
- Services = service.NewBus()
+ cfgFile string
+ verbose bool
+
+ // Logger - shared logger.
+ Logger = logrus.New()
+
+ // Services - shared service bus.
+ Services = service.NewRegistry(Logger)
// CLI is application endpoint.
CLI = &cobra.Command{
- Use: "rr",
- Short: "RoadRunner, PHP application server",
+ Use: "rr",
+ SilenceErrors: true,
+ SilenceUsage: true,
+ Short: utils.Sprintf("<green>RoadRunner, PHP Application Server.</reset>"),
}
-
- cfgFile string
- verbose bool
)
// Execute adds all child commands to the CLI command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the CLI.
func Execute() {
if err := CLI.Execute(); err != nil {
- logrus.Error(err)
+ utils.Printf("Error: <red>%s</reset>\n", err)
os.Exit(1)
}
}
@@ -59,7 +64,7 @@ func init() {
cobra.OnInitialize(func() {
if verbose {
- logrus.SetLevel(logrus.DebugLevel)
+ Logger.SetLevel(logrus.DebugLevel)
}
if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil {
@@ -81,6 +86,7 @@ func initConfig(cfgFile string, path []string, name string) service.Config {
for _, p := range path {
cfg.AddConfigPath(p)
}
+
cfg.SetConfigName(name)
}
@@ -89,9 +95,9 @@ func initConfig(cfgFile string, path []string, name string) service.Config {
// If a cfg file is found, read it in.
if err := cfg.ReadInConfig(); err != nil {
- logrus.Warnf("config: %s", err)
+ Logger.Warnf("config: %s", err)
return nil
}
- return &utils.ConfigWrapper{cfg}
+ return &utils.ViperWrapper{Viper: cfg}
}
diff --git a/cmd/rr/http/register.go b/cmd/rr/http/register.go
deleted file mode 100644
index fb828578..00000000
--- a/cmd/rr/http/register.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package http
-
-import (
- "github.com/spf13/cobra"
- rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/http"
-)
-
-func init() {
- rr.Services.Register(&http.Service{})
-
- rr.CLI.AddCommand(&cobra.Command{
- Use: "http:reload",
- Short: "Reload RoadRunner worker pools for the HTTP service",
- Run: reloadHandler,
- })
-
- rr.CLI.AddCommand(&cobra.Command{
- Use: "http:workers",
- Short: "List workers associated with RoadRunner HTTP service",
- Run: workersHandler,
- })
-}
diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go
index 6cdba576..0fd3d7e9 100644
--- a/cmd/rr/http/reload.go
+++ b/cmd/rr/http/reload.go
@@ -23,22 +23,34 @@ package http
import (
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/sirupsen/logrus"
+ "github.com/go-errors/errors"
+ "github.com/spiral/roadrunner/rpc"
)
-func reloadHandler(cmd *cobra.Command, args []string) {
- client, err := rr.Services.RCPClient()
+func init() {
+ rr.CLI.AddCommand(&cobra.Command{
+ Use: "http:reload",
+ Short: "Reload RoadRunner worker pools for the HTTP service",
+ RunE: reloadHandler,
+ })
+}
+
+func reloadHandler(cmd *cobra.Command, args []string) error {
+ if !rr.Services.Has("rpc") {
+ return errors.New("RPC service is not configured")
+ }
+
+ client, err := rr.Services.Get("rpc").(*rpc.Service).Client()
if err != nil {
- logrus.Error(err)
- return
+ return err
}
defer client.Close()
var r string
if err := client.Call("http.Reset", true, &r); err != nil {
- logrus.Error(err)
- return
+ return err
}
- logrus.Info("restarting http worker pool")
+ rr.Logger.Info("http.service: restarting worker pool")
+ return nil
}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
index 13e8d21c..63ef0cce 100644
--- a/cmd/rr/http/workers.go
+++ b/cmd/rr/http/workers.go
@@ -21,38 +21,47 @@
package http
import (
- "github.com/olekukonko/tablewriter"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/http"
- "os"
- "strconv"
- "github.com/sirupsen/logrus"
+ "errors"
+ "github.com/spiral/roadrunner/rpc"
)
-func workersHandler(cmd *cobra.Command, args []string) {
- client, err := rr.Services.RCPClient()
- if err != nil {
- logrus.Error(err)
- return
- }
- defer client.Close()
+func init() {
+ rr.CLI.AddCommand(&cobra.Command{
+ Use: "http:workers",
+ Short: "List workers associated with RoadRunner HTTP service",
+ RunE: workersHandler,
+ })
+}
- var r http.WorkerList
- if err := client.Call("http.Workers", true, &r); err != nil {
- panic(err)
+func workersHandler(cmd *cobra.Command, args []string) error {
+ if !rr.Services.Has("rpc") {
+ return errors.New("RPC service is not configured")
}
- tw := tablewriter.NewWriter(os.Stdout)
- tw.SetHeader([]string{"PID", "Status", "Num Execs"})
-
- for _, w := range r.Workers {
- tw.Append([]string{
- strconv.Itoa(w.Pid),
- w.Status,
- strconv.Itoa(int(w.NumExecs)),
- })
+ client, err := rr.Services.Get("rpc").(*rpc.Service).Client()
+ if err != nil {
+ return err
}
+ defer client.Close()
- tw.Render()
+ //var r http.WorkerList
+ //if err := client.Call("http.Workers", true, &r); err != nil {
+ // panic(err)
+ //}
+ //
+ //tw := tablewriter.NewWriter(os.Stdout)
+ //tw.SetHeader([]string{"PID", "Status", "Num Execs"})
+ //
+ //for _, w := range r.Workers {
+ // tw.Append([]string{
+ // strconv.Itoa(w.Pid),
+ // w.Status,
+ // strconv.Itoa(int(w.NumExecs)),
+ // })
+ //}
+ //
+ //tw.Render()
+ return nil
}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 9d6f685c..26f70fdd 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -23,13 +23,17 @@
package main
import (
- "github.com/spiral/roadrunner/cmd/rr/cmd"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/rpc"
- // service plugins
+ // cli plugins
_ "github.com/spiral/roadrunner/cmd/rr/http"
)
func main() {
+ // provides ability to make local connection to services
+ rr.Services.Register("rpc", new(rpc.Service))
+
// you can register additional commands using cmd.CLI
- cmd.Execute()
+ rr.Execute()
}
diff --git a/cmd/rr/utils/config.go b/cmd/rr/utils/config.go
index e7e22b3a..452dd195 100644
--- a/cmd/rr/utils/config.go
+++ b/cmd/rr/utils/config.go
@@ -5,19 +5,22 @@ import (
"github.com/spiral/roadrunner/service"
)
-type ConfigWrapper struct {
+// ViperWrapper provides interface bridge between Viper configs and service.Config.
+type ViperWrapper struct {
Viper *viper.Viper
}
-func (w *ConfigWrapper) Get(key string) service.Config {
+// Get nested config section (sub-map), returns nil if section not found.
+func (w *ViperWrapper) Get(key string) service.Config {
sub := w.Viper.Sub(key)
if sub == nil {
return nil
}
- return &ConfigWrapper{sub}
+ return &ViperWrapper{sub}
}
-func (w *ConfigWrapper) Unmarshal(out interface{}) error {
+// Unmarshal unmarshal config data into given struct.
+func (w *ViperWrapper) Unmarshal(out interface{}) error {
return w.Viper.Unmarshal(out)
}
diff --git a/rpc/config.go b/rpc/config.go
new file mode 100644
index 00000000..67dc1094
--- /dev/null
+++ b/rpc/config.go
@@ -0,0 +1,35 @@
+package rpc
+
+import (
+ "errors"
+ "net"
+ "strings"
+)
+
+type config struct {
+ // Indicates if RPC connection is enabled.
+ Enable bool
+
+ // Listen string
+ Listen string
+}
+
+// listener creates new rpc socket listener.
+func (cfg *config) listener() (net.Listener, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+ }
+
+ return net.Listen(dsn[0], dsn[1])
+}
+
+// dialer creates rpc socket dialer.
+func (cfg *config) dialer() (net.Conn, error) {
+ dsn := strings.Split(cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
diff --git a/rpc/service.go b/rpc/service.go
new file mode 100644
index 00000000..61a9a1a3
--- /dev/null
+++ b/rpc/service.go
@@ -0,0 +1,99 @@
+package rpc
+
+import (
+ "errors"
+ "github.com/spiral/goridge"
+ "github.com/spiral/roadrunner/service"
+ "net/rpc"
+)
+
+// Service is RPC service.
+type Service struct {
+ cfg *config
+ stop chan interface{}
+ rpc *rpc.Server
+}
+
+// WithConfig must return Service instance configured with the given environment. Must return error in case of
+// misconfiguration, might return nil as Service if Service is not enabled.
+func (s *Service) WithConfig(cfg service.Config, reg service.Registry) (service.Service, error) {
+ config := &config{}
+ if err := cfg.Unmarshal(config); err != nil {
+ return nil, err
+ }
+
+ if !config.Enable {
+ return nil, nil
+ }
+
+ return &Service{cfg: config, rpc: rpc.NewServer()}, nil
+}
+
+// Serve serves Service.
+func (s *Service) Serve() error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ s.stop = make(chan interface{})
+
+ ln, err := s.cfg.listener()
+ if err != nil {
+ return err
+ }
+ defer ln.Close()
+
+ for {
+ select {
+ case <-s.stop:
+ return nil
+ default:
+ conn, err := ln.Accept()
+ if err != nil {
+ continue
+ }
+
+ s.rpc.Accept(ln)
+
+ go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ }
+ }
+
+ return nil
+}
+
+// Stop stop Service Service.
+func (s *Service) Stop() error {
+ close(s.stop)
+ return nil
+}
+
+// Register publishes in the server the set of methods of the
+// receiver value that satisfy the following conditions:
+// - exported method of exported type
+// - two arguments, both of exported type
+// - the second argument is a pointer
+// - one return value, of type error
+// It returns an error if the receiver is not an exported type or has
+// no suitable methods. It also logs the error using package log.
+func (s *Service) Register(name string, rcvr interface{}) error {
+ if s.rpc == nil {
+ return errors.New("RPC service is not configured")
+ }
+
+ return s.rpc.RegisterName(name, rcvr)
+}
+
+// Client creates new RPC client.
+func (s *Service) Client() (*rpc.Client, error) {
+ if s.cfg == nil {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ conn, err := s.cfg.dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+}
diff --git a/service/bus.go b/service/bus.go
deleted file mode 100644
index 8bfb914c..00000000
--- a/service/bus.go
+++ /dev/null
@@ -1,165 +0,0 @@
-package service
-
-import (
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- "github.com/spiral/goridge"
- "net/rpc"
- "sync"
-)
-
-const (
- rpcConfig = "rpc"
-)
-
-var (
- dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
-)
-
-type Bus struct {
- wg sync.WaitGroup
- services []Service
- enabled []Service
- stop chan interface{}
- rpc *rpc.Server
- rpcConfig *RPCConfig
-}
-
-func (b *Bus) Register(s Service) {
- b.services = append(b.services, s)
-}
-
-func (b *Bus) Services() []Service {
- return b.services
-}
-
-func (b *Bus) Configure(cfg Config) error {
- if segment := cfg.Get(rpcConfig); segment == nil {
- logrus.Warn("rpc: no config has been provided")
- } else {
- b.rpcConfig = &RPCConfig{}
- if err := segment.Unmarshal(b.rpcConfig); err != nil {
- return err
- }
- }
-
- b.enabled = make([]Service, 0)
-
- for _, s := range b.services {
- segment := cfg.Get(s.Name())
- if segment == nil {
- // no config has been provided for the service
- logrus.Debugf("%s: no config has been provided", s.Name())
- continue
- }
-
- if enable, err := s.Configure(segment); err != nil {
- return err
- } else if !enable {
- continue
- }
-
- b.enabled = append(b.enabled, s)
- }
-
- return nil
-}
-
-func (b *Bus) RCPClient() (*rpc.Client, error) {
- if b.rpcConfig == nil {
- return nil, errors.New("rpc is not configured")
- }
-
- conn, err := b.rpcConfig.CreateDialer()
- if err != nil {
- return nil, err
- }
-
- return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
-}
-
-func (b *Bus) Serve() {
- b.rpc = rpc.NewServer()
-
- for _, s := range b.enabled {
- // some services might provide net/rpc api for internal communications
- if api := s.RPC(); api != nil {
- b.rpc.RegisterName(s.Name(), api)
- }
-
- b.wg.Add(1)
- go func() {
- defer b.wg.Done()
-
- if err := s.Serve(); err != nil {
- logrus.Errorf("%s.start: %s", s.Name(), err)
- }
- }()
- }
-
- b.wg.Add(1)
- go func() {
- defer b.wg.Done()
-
- logrus.Debug("rpc: started")
- if err := b.serveRPC(); err != nil {
- logrus.Errorf("rpc: %s", err)
- }
- }()
-
- b.wg.Wait()
-}
-
-func (b *Bus) Stop() {
- if err := b.stopRPC(); err != nil {
- logrus.Errorf("rpc: ", err)
- }
-
- for _, s := range b.enabled {
- if err := s.Stop(); err != nil {
- logrus.Errorf("%s.stop: %s", s.Name(), err)
- }
- }
-
- b.wg.Wait()
-}
-
-func (b *Bus) serveRPC() error {
- if b.rpcConfig == nil {
- return nil
- }
-
- b.stop = make(chan interface{})
-
- ln, err := b.rpcConfig.CreateListener()
- if err != nil {
- return err
- }
- defer ln.Close()
-
- for {
- select {
- case <-b.stop:
- b.rpc = nil
- return nil
- default:
- conn, err := ln.Accept()
- if err != nil {
- continue
- }
-
- go b.rpc.ServeCodec(goridge.NewCodec(conn))
- }
- }
-
- return nil
-}
-
-func (b *Bus) stopRPC() error {
- if b.rpcConfig == nil {
- return nil
- }
-
- close(b.stop)
- return nil
-}
diff --git a/service/config.go b/service/config.go
deleted file mode 100644
index d5381376..00000000
--- a/service/config.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package service
-
-type Config interface {
- Get(key string) Config
- Unmarshal(out interface{}) error
-}
diff --git a/service/registry.go b/service/registry.go
new file mode 100644
index 00000000..d4e2ff12
--- /dev/null
+++ b/service/registry.go
@@ -0,0 +1,162 @@
+package service
+
+import (
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "sync"
+)
+
+// Config provides ability to slice configuration sections and unmarshal configuration data into
+// given structure.
+type Config interface {
+ // Get nested config section (sub-map), returns nil if section not found.
+ Get(service string) Config
+
+ // Unmarshal unmarshal config data into given struct.
+ Unmarshal(out interface{}) error
+}
+
+// Registry controls all internal RR services and provides plugin based system.
+type Registry interface {
+ // Register add new service to the registry under given name.
+ Register(name string, service Service)
+
+ // Configure configures all underlying services with given configuration.
+ Configure(cfg Config) error
+
+ // Check is Service has been registered and configured.
+ Has(service string) bool
+
+ // Get returns Service instance by it's Name or nil if Service not found. Method must return only configured instance.
+ Get(service string) Service
+
+ // Serve all configured services. Non blocking.
+ Serve() error
+
+ // Stop all active services.
+ Stop() error
+}
+
+// Service provides high level functionality for road runner Service.
+type Service interface {
+ // WithConfig must return Service instance configured with the given environment. Must return error in case of
+ // misconfiguration, might return nil as Service if Service is not enabled.
+ WithConfig(cfg Config, reg Registry) (Service, error)
+
+ // Serve serves Service.
+ Serve() error
+
+ // Stop stop Service Service.
+ Stop() error
+}
+
+type registry struct {
+ log logrus.FieldLogger
+ mu sync.Mutex
+ candidates []*entry
+ configured []*entry
+}
+
+// entry creates association between service instance and given name.
+type entry struct {
+ // Associated service name
+ Name string
+
+ // Associated service instance
+ Service Service
+
+ // Serving indicates that service is currently serving
+ Serving bool
+}
+
+// NewRegistry creates new registry.
+func NewRegistry(log logrus.FieldLogger) Registry {
+ return &registry{
+ log: log,
+ candidates: make([]*entry, 0),
+ }
+}
+
+// Register add new service to the registry under given name.
+func (r *registry) Register(name string, service Service) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ r.candidates = append(r.candidates, &entry{
+ Name: name,
+ Service: service,
+ Serving: false,
+ })
+
+ r.log.Debugf("%s.service: registered", name)
+}
+
+// Configure configures all underlying services with given configuration.
+func (r *registry) Configure(cfg Config) error {
+ if r.configured != nil {
+ return fmt.Errorf("service bus has been already configured")
+ }
+
+ r.configured = make([]*entry, 0)
+ for _, e := range r.candidates {
+ segment := cfg.Get(e.Name)
+ if segment == nil {
+ r.log.Debugf("%s.service: no config has been provided", e.Name)
+ continue
+ }
+
+ s, err := e.Service.WithConfig(segment, r)
+ if err != nil {
+ return errors.Wrap(err, fmt.Sprintf("%s.service", e.Name))
+ }
+
+ if s != nil {
+ r.configured = append(r.configured, &entry{
+ Name: e.Name,
+ Service: s,
+ Serving: false,
+ })
+ }
+ }
+
+ return nil
+}
+
+// Check is Service has been registered.
+func (r *registry) Has(service string) bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ for _, e := range r.configured {
+ if e.Name == service {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Get returns Service instance by it's Name or nil if Service not found.
+func (r *registry) Get(service string) Service {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ for _, e := range r.configured {
+ if e.Name == service {
+ return e.Service
+ }
+ }
+
+ return nil
+}
+
+// Serve all configured services. Non blocking.
+func (r *registry) Serve() error {
+ return nil
+}
+
+// Stop all active services.
+func (r *registry) Stop() error {
+ return nil
+}
diff --git a/service/rpc.go b/service/rpc.go
deleted file mode 100644
index eb128768..00000000
--- a/service/rpc.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package service
-
-import (
- "net"
- "strings"
-)
-
-type RPCConfig struct {
- Listen string
-}
-
-func (cfg *RPCConfig) CreateListener() (net.Listener, error) {
- dsn := strings.Split(cfg.Listen, "://")
- if len(dsn) != 2 {
- return nil, dsnError
- }
-
- return net.Listen(dsn[0], dsn[1])
-}
-
-func (cfg *RPCConfig) CreateDialer() (net.Conn, error) {
- dsn := strings.Split(cfg.Listen, "://")
- if len(dsn) != 2 {
- return nil, dsnError
- }
-
- return net.Dial(dsn[0], dsn[1])
-}
-
-func NewBus() *Bus {
- return &Bus{services: make([]Service, 0)}
-}
diff --git a/service/service.go b/service/service.go
deleted file mode 100644
index 2f704657..00000000
--- a/service/service.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package service
-
-type Service interface {
- Name() string
- Configure(cfg Config) (bool, error)
- RPC() interface{}
- Serve() error
- Stop() error
-}