summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-05 16:56:12 +0300
committerWolfy-J <[email protected]>2018-06-05 16:56:12 +0300
commit3112f9b58c73773cea972fd79f04d33f8f7d7edd (patch)
tree334941e56becd1dde9fd1ce353e63d63775d772b
parent76ff8d1c95e087749d559ee5a4f8f0348feafffa (diff)
Cs and refactoring
-rw-r--r--cmd/_____/factory.go34
-rw-r--r--server.go8
-rw-r--r--server_config.go47
-rw-r--r--socket_factory.go5
-rw-r--r--socket_factory_test.go25
-rw-r--r--worker.go2
6 files changed, 86 insertions, 35 deletions
diff --git a/cmd/_____/factory.go b/cmd/_____/factory.go
index 8ecf90ca..cee962e2 100644
--- a/cmd/_____/factory.go
+++ b/cmd/_____/factory.go
@@ -2,7 +2,6 @@ package _____
import (
"github.com/spiral/roadrunner"
- "net"
"os/exec"
"strings"
"time"
@@ -23,21 +22,6 @@ type PoolConfig struct {
}
}
-func (f *PoolConfig) NewServer() (*roadrunner.Server, func(), error) {
- relays, terminator, err := f.relayFactory()
- if err != nil {
- terminator()
- return nil, nil, err
- }
-
- rr := roadrunner.NewServer(f.cmd(), relays)
- if err := rr.Configure(f.rrConfig()); err != nil {
- return nil, nil, err
- }
-
- return rr, nil, nil
-}
-
func (f *PoolConfig) rrConfig() roadrunner.Config {
return roadrunner.Config{
NumWorkers: f.Number,
@@ -51,21 +35,3 @@ func (f *PoolConfig) cmd() func() *exec.Cmd {
cmd := strings.Split(f.Command, " ")
return func() *exec.Cmd { return exec.Command(cmd[0], cmd[1:]...) }
}
-
-func (f *PoolConfig) relayFactory() (roadrunner.Factory, func(), error) {
- if f.Relay == "pipes" || f.Relay == "pipe" {
- return roadrunner.NewPipeFactory(), nil, nil
- }
-
- dsn := strings.Split(f.Relay, "://")
- if len(dsn) != 2 {
- return nil, nil, dsnError
- }
-
- ln, err := net.Listen(dsn[0], dsn[1])
- if err != nil {
- return nil, nil, err
- }
-
- return roadrunner.NewSocketFactory(ln, time.Minute), func() { ln.Close() }, nil
-}
diff --git a/server.go b/server.go
index 0291b90f..f6b82bad 100644
--- a/server.go
+++ b/server.go
@@ -128,6 +128,14 @@ func (r *Server) Destroy() {
r.pool = nil
}
+func (r *Server) Start() {
+ // ????
+}
+
+func (r *Server) Stop() {
+ // stop factory?
+}
+
// throw invokes event handler if any.
func (r *Server) throw(event int, ctx interface{}) {
if r.observer != nil {
diff --git a/server_config.go b/server_config.go
new file mode 100644
index 00000000..2816a70b
--- /dev/null
+++ b/server_config.go
@@ -0,0 +1,47 @@
+package roadrunner
+
+import (
+ "time"
+ "strings"
+ "net"
+ "errors"
+)
+
+const (
+ FactoryPipes = iota
+ FactorySocket
+)
+
+type ServerConfig struct {
+ // Relay defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string
+
+ // FactoryTimeout defines for how long socket factory will be waiting for worker connection. For socket factory only.
+ // This config section must not change on re-configuration.
+ FactoryTimeout time.Duration
+
+ // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
+ // while server is running.
+ Pool Config
+}
+
+// buildFactory creates and connects new factory instance based on given parameters.
+func (f *ServerConfig) buildFactory() (Factory, error) {
+ if f.Relay == "pipes" {
+ return NewPipeFactory(), nil
+ }
+
+ dsn := strings.Split(f.Relay, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)")
+ }
+
+ ln, err := net.Listen(dsn[0], dsn[1])
+ if err != nil {
+ return nil, nil
+ }
+
+ return NewSocketFactory(ln, time.Second*f.FactoryTimeout), nil
+}
diff --git a/socket_factory.go b/socket_factory.go
index c7fe639d..70f70f30 100644
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -70,6 +70,11 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
return w, nil
}
+// Close socket factory and underlying socket connection.
+func (f *SocketFactory) Close() error {
+ return f.ls.Close()
+}
+
// listens for incoming socket connections
func (f *SocketFactory) listen() {
for {
diff --git a/socket_factory_test.go b/socket_factory_test.go
index 3bb60ab0..f6b1350c 100644
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -32,6 +32,31 @@ func Test_Tcp_Start(t *testing.T) {
w.Stop()
}
+func Test_Tcp_StartCloseFactory(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "php-src/tests/client.php", "echo", "tcp")
+
+ f := NewSocketFactory(ls, time.Minute)
+ defer f.Close()
+
+ w, err := f.SpawnWorker(cmd)
+ assert.NoError(t, err)
+ assert.NotNil(t, w)
+
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+
+ w.Stop()
+}
+
func Test_Tcp_StartError(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
diff --git a/worker.go b/worker.go
index a29e514a..6bd317d8 100644
--- a/worker.go
+++ b/worker.go
@@ -30,7 +30,7 @@ type Worker struct {
state *state
// underlying command with associated process, command must be
- // provided to worker from outside in non-started form. Command
+ // provided to worker from outside in non-started form. Cmd
// stdErr direction will be handled by worker to aggregate error message.
cmd *exec.Cmd