diff options
author | Wolfy-J <[email protected]> | 2018-06-05 16:56:12 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-05 16:56:12 +0300 |
commit | 3112f9b58c73773cea972fd79f04d33f8f7d7edd (patch) | |
tree | 334941e56becd1dde9fd1ce353e63d63775d772b | |
parent | 76ff8d1c95e087749d559ee5a4f8f0348feafffa (diff) |
Cs and refactoring
-rw-r--r-- | cmd/_____/factory.go | 34 | ||||
-rw-r--r-- | server.go | 8 | ||||
-rw-r--r-- | server_config.go | 47 | ||||
-rw-r--r-- | socket_factory.go | 5 | ||||
-rw-r--r-- | socket_factory_test.go | 25 | ||||
-rw-r--r-- | worker.go | 2 |
6 files changed, 86 insertions, 35 deletions
diff --git a/cmd/_____/factory.go b/cmd/_____/factory.go index 8ecf90ca..cee962e2 100644 --- a/cmd/_____/factory.go +++ b/cmd/_____/factory.go @@ -2,7 +2,6 @@ package _____ import ( "github.com/spiral/roadrunner" - "net" "os/exec" "strings" "time" @@ -23,21 +22,6 @@ type PoolConfig struct { } } -func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) { - relays, terminator, err := f.relayFactory() - if err != nil { - terminator() - return nil, nil, err - } - - rr := roadrunner.NewServer(f.cmd(), relays) - if err := rr.Configure(f.rrConfig()); err != nil { - return nil, nil, err - } - - return rr, nil, nil -} - func (f *PoolConfig) rrConfig() roadrunner.Config { return roadrunner.Config{ NumWorkers: f.Number, @@ -51,21 +35,3 @@ func (f *PoolConfig) cmd() func() *exec.Cmd { cmd := strings.Split(f.Command, " ") return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) } } - -func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) { - if f.Relay == "pipes" || f.Relay == "pipe" { - return roadrunner.NewPipeFactory(), nil, nil - } - - dsn := strings.Split(f.Relay, "://") - if len(dsn) != 2 { - return nil, nil, dsnError - } - - ln, err := net.Listen(dsn[0], dsn[1]) - if err != nil { - return nil, nil, err - } - - return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil -} @@ -128,6 +128,14 @@ func (r *Server) Destroy() { r.pool = nil } +func (r *Server) Start() { + // ???? +} + +func (r *Server) Stop() { + // stop factory? +} + // throw invokes event handler if any. func (r *Server) throw(event int, ctx interface{}) { if r.observer != nil { diff --git a/server_config.go b/server_config.go new file mode 100644 index 00000000..2816a70b --- /dev/null +++ b/server_config.go @@ -0,0 +1,47 @@ +package roadrunner + +import ( + "time" + "strings" + "net" + "errors" +) + +const ( + FactoryPipes = iota + FactorySocket +) + +type ServerConfig struct { + // Relay defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // FactoryTimeout defines for how long socket factory will be waiting for worker connection. For socket factory only. + // This config section must not change on re-configuration. + FactoryTimeout time.Duration + + // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change + // while server is running. + Pool Config +} + +// buildFactory creates and connects new factory instance based on given parameters. +func (f *ServerConfig) buildFactory() (Factory, error) { + if f.Relay == "pipes" { + return NewPipeFactory(), nil + } + + dsn := strings.Split(f.Relay, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)") + } + + ln, err := net.Listen(dsn[0], dsn[1]) + if err != nil { + return nil, nil + } + + return NewSocketFactory(ln, time.Second*f.FactoryTimeout), nil +} diff --git a/socket_factory.go b/socket_factory.go index c7fe639d..70f70f30 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -70,6 +70,11 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { return w, nil } +// Close socket factory and underlying socket connection. +func (f *SocketFactory) Close() error { + return f.ls.Close() +} + // listens for incoming socket connections func (f *SocketFactory) listen() { for { diff --git a/socket_factory_test.go b/socket_factory_test.go index 3bb60ab0..f6b1350c 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -32,6 +32,31 @@ func Test_Tcp_Start(t *testing.T) { w.Stop() } +func Test_Tcp_StartCloseFactory(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp") + + f := NewSocketFactory(ls, time.Minute) + defer f.Close() + + w, err := f.SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + w.Stop() +} + func Test_Tcp_StartError(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket @@ -30,7 +30,7 @@ type Worker struct { state *state // underlying command with associated process, command must be - // provided to worker from outside in non-started form. Command + // provided to worker from outside in non-started form. Cmd // stdErr direction will be handled by worker to aggregate error message. cmd *exec.Cmd |