summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/_____/bus.go94
-rw-r--r--cmd/_____/http/static.go4
-rw-r--r--cmd/rr/http/reload.go2
-rw-r--r--cmd/rr/http/workers.go2
-rw-r--r--ext/config_test.go (renamed from config_test.go)2
-rw-r--r--ext/errors_test.go (renamed from errors_test.go)2
-rw-r--r--ext/pipe_factory_test.go (renamed from pipe_factory_test.go)2
-rw-r--r--ext/protocol_test.go (renamed from protocol_test.go)2
-rw-r--r--ext/socket_factory_test.go (renamed from socket_factory_test.go)2
-rw-r--r--factory.go3
-rw-r--r--http/data.go (renamed from cmd/_____/http/data.go)0
-rw-r--r--http/request.go (renamed from cmd/_____/http/request.go)0
-rw-r--r--http/response.go (renamed from cmd/_____/http/response.go)0
-rw-r--r--http/server.go1
-rw-r--r--http/uploads.go (renamed from cmd/_____/http/uploads.go)23
-rw-r--r--pipe_factory.go5
-rw-r--r--rpc/service.go2
-rw-r--r--server.go30
-rw-r--r--server_config.go12
-rw-r--r--service/registry.go6
-rw-r--r--state.go12
-rw-r--r--static_pool.go20
-rw-r--r--static_pool_test.go147
-rw-r--r--worker.go25
24 files changed, 200 insertions, 198 deletions
diff --git a/cmd/_____/bus.go b/cmd/_____/bus.go
deleted file mode 100644
index 813a6c3b..00000000
--- a/cmd/_____/bus.go
+++ /dev/null
@@ -1,94 +0,0 @@
-package _____
-
-import (
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- "net/rpc"
- "sync"
-)
-
-// Config provides ability to slice configuration sections and unmarshal configuration data into
-// given structure.
-type Config interface {
- // Get nested config section (sub-map), returns nil if section not found.
- Get(service string) Config
-
- // Unmarshal unmarshal config data into given struct.
- Unmarshal(out interface{}) error
-}
-
-var (
- dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)")
-)
-
-type Bus struct {
- services []Service
- wg sync.WaitGroup
- enabled []Service
- stop chan interface{}
- rpc *rpc.Server
- rpcConfig *RPCConfig
-}
-
-func (b *Bus) Register(s Service) {
- b.services = append(b.services, s)
-}
-
-func (b *Bus) Services() []Service {
- return b.services
-}
-
-func (b *Bus) Configure(cfg Config) error {
- b.enabled = make([]Service, 0)
-
- for _, s := range b.services {
- segment := cfg.Get(s.Name())
- if segment == nil {
- // no config has been provided for the Service
- logrus.Debugf("%s: no config has been provided", s.Name())
- continue
- }
-
- if enable, err := s.Configure(segment); err != nil {
- return err
- } else if !enable {
- continue
- }
-
- b.enabled = append(b.enabled, s)
- }
-
- return nil
-}
-
-func (b *Bus) Serve() {
- b.rpc = rpc.NewServer()
-
- for _, s := range b.enabled {
- // some candidates might provide net/rpc api for internal communications
- if api := s.RPC(); api != nil {
- b.rpc.RegisterName(s.Name(), api)
- }
-
- b.wg.Add(1)
- go func() {
- defer b.wg.Done()
-
- if err := s.Serve(); err != nil {
- logrus.Errorf("%s.start: %s", s.Name(), err)
- }
- }()
- }
-
- b.wg.Wait()
-}
-
-func (b *Bus) Stop() {
- for _, s := range b.enabled {
- if err := s.Stop(); err != nil {
- logrus.Errorf("%s.stop: %s", s.Name(), err)
- }
- }
-
- b.wg.Wait()
-}
diff --git a/cmd/_____/http/static.go b/cmd/_____/http/static.go
index d7030c3f..b055099f 100644
--- a/cmd/_____/http/static.go
+++ b/cmd/_____/http/static.go
@@ -9,9 +9,7 @@ import (
"strings"
)
-var (
- forbiddenFiles = []string{".php", ".htaccess"}
-)
+var forbiddenFiles = []string{".php", ".htaccess"}
// staticServer serves static files
type staticServer struct {
diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go
index 0fd3d7e9..23c36144 100644
--- a/cmd/rr/http/reload.go
+++ b/cmd/rr/http/reload.go
@@ -21,9 +21,9 @@
package http
import (
+ "errors"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/go-errors/errors"
"github.com/spiral/roadrunner/rpc"
)
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
index 63ef0cce..61fee2a1 100644
--- a/cmd/rr/http/workers.go
+++ b/cmd/rr/http/workers.go
@@ -21,9 +21,9 @@
package http
import (
+ "errors"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "errors"
"github.com/spiral/roadrunner/rpc"
)
diff --git a/config_test.go b/ext/config_test.go
index f4c6246d..fbdde223 100644
--- a/config_test.go
+++ b/ext/config_test.go
@@ -1,4 +1,4 @@
-package roadrunner
+package ext
import (
"github.com/stretchr/testify/assert"
diff --git a/errors_test.go b/ext/errors_test.go
index 9b0fa53e..7c9d7a5b 100644
--- a/errors_test.go
+++ b/ext/errors_test.go
@@ -1,4 +1,4 @@
-package roadrunner
+package ext
import (
"github.com/stretchr/testify/assert"
diff --git a/pipe_factory_test.go b/ext/pipe_factory_test.go
index ae276ab6..434c31b0 100644
--- a/pipe_factory_test.go
+++ b/ext/pipe_factory_test.go
@@ -1,4 +1,4 @@
-package roadrunner
+package ext
import (
"github.com/stretchr/testify/assert"
diff --git a/protocol_test.go b/ext/protocol_test.go
index ed3fe461..f6410ef5 100644
--- a/protocol_test.go
+++ b/ext/protocol_test.go
@@ -1,4 +1,4 @@
-package roadrunner
+package ext
import (
"github.com/pkg/errors"
diff --git a/socket_factory_test.go b/ext/socket_factory_test.go
index f6b1350c..2d6108bc 100644
--- a/socket_factory_test.go
+++ b/ext/socket_factory_test.go
@@ -1,4 +1,4 @@
-package roadrunner
+package ext
import (
"github.com/stretchr/testify/assert"
diff --git a/factory.go b/factory.go
index 73896406..7d20016c 100644
--- a/factory.go
+++ b/factory.go
@@ -7,4 +7,7 @@ type Factory interface {
// SpawnWorker creates new worker process based on given command.
// Process must not be started.
SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
+
+ // Close the factory and underlying connections.
+ Close() error
}
diff --git a/cmd/_____/http/data.go b/http/data.go
index e6b8344f..e6b8344f 100644
--- a/cmd/_____/http/data.go
+++ b/http/data.go
diff --git a/cmd/_____/http/request.go b/http/request.go
index fd483744..fd483744 100644
--- a/cmd/_____/http/request.go
+++ b/http/request.go
diff --git a/cmd/_____/http/response.go b/http/response.go
index 2736c4ab..2736c4ab 100644
--- a/cmd/_____/http/response.go
+++ b/http/response.go
diff --git a/http/server.go b/http/server.go
new file mode 100644
index 00000000..d02cfda6
--- /dev/null
+++ b/http/server.go
@@ -0,0 +1 @@
+package http
diff --git a/cmd/_____/http/uploads.go b/http/uploads.go
index 468e8a19..c3b18169 100644
--- a/cmd/_____/http/uploads.go
+++ b/http/uploads.go
@@ -2,7 +2,6 @@ package http
import (
"encoding/json"
- "fmt"
"io"
"io/ioutil"
"mime/multipart"
@@ -80,15 +79,6 @@ func (f *FileUpload) Open(cfg *Config) error {
return err
}
-func wrapUpload(f *multipart.FileHeader) *FileUpload {
- return &FileUpload{
- Name: f.Filename,
- MimeType: f.Header.Get("Content-Type"),
- Error: UploadErrorOK,
- header: f,
- }
-}
-
type fileTree map[string]interface{}
func (d fileTree) push(k string, v []*FileUpload) {
@@ -192,7 +182,16 @@ func parseUploads(r *http.Request) (*Uploads, error) {
return u, nil
}
-// exists if file exists. by osutils; todo: better?
+func wrapUpload(f *multipart.FileHeader) *FileUpload {
+ return &FileUpload{
+ Name: f.Filename,
+ MimeType: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
+ header: f,
+ }
+}
+
+// exists if file exists.
func exists(path string) bool {
_, err := os.Stat(path)
if err == nil {
@@ -203,5 +202,5 @@ func exists(path string) bool {
return false
}
- panic(fmt.Errorf("unable to stat path %q; %v", path, err))
+ return false
}
diff --git a/pipe_factory.go b/pipe_factory.go
index 1ebcc69d..d6fe0420 100644
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -61,3 +61,8 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
w.state.set(StateReady)
return w, nil
}
+
+// Close the factory.
+func (f *PipeFactory) Close() error {
+ return nil
+}
diff --git a/rpc/service.go b/rpc/service.go
index 61a9a1a3..b461bdc2 100644
--- a/rpc/service.go
+++ b/rpc/service.go
@@ -62,7 +62,7 @@ func (s *Service) Serve() error {
return nil
}
-// Stop stop Service Service.
+// Close stop Service Service.
func (s *Service) Stop() error {
close(s.stop)
return nil
diff --git a/server.go b/server.go
index f6b82bad..9a24cb39 100644
--- a/server.go
+++ b/server.go
@@ -16,14 +16,14 @@ const (
// Service manages pool creation and swapping.
type Server struct {
- // observes pool events (can be attached to multiple pools at the same time)
- observer func(event int, ctx interface{})
-
// worker command creator
cmd func() *exec.Cmd
- // pool behaviour
- cfg Config
+ // defines server wide configuration, behaviour and timeouts.
+ config ServerConfig
+
+ // observes pool events (can be attached to multiple pools at the same time)
+ observer func(event int, ctx interface{})
// creates and connects to workers
factory Factory
@@ -31,6 +31,9 @@ type Server struct {
// protects pool while the switch
mu sync.Mutex
+ // pool behaviour
+ cfg Config
+
// currently active pool instance
pool Pool
}
@@ -128,12 +131,23 @@ func (r *Server) Destroy() {
r.pool = nil
}
-func (r *Server) Start() {
- // ????
+// Start the server underlying worker pool and factory.
+func (r *Server) Start() error {
+ if r.factory != nil {
+ //todo: already have started
+ return nil
+ }
+
+ return nil
}
+// Stop the server and close underlying factory.
func (r *Server) Stop() {
- // stop factory?
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ r.factory.Close()
+ r.factory = nil
}
// throw invokes event handler if any.
diff --git a/server_config.go b/server_config.go
index 2816a70b..7e0dcbe9 100644
--- a/server_config.go
+++ b/server_config.go
@@ -1,14 +1,14 @@
package roadrunner
import (
- "time"
- "strings"
- "net"
"errors"
+ "net"
+ "strings"
+ "time"
)
const (
- FactoryPipes = iota
+ FactoryPipes = iota
FactorySocket
)
@@ -18,8 +18,8 @@ type ServerConfig struct {
// This config section must not change on re-configuration.
Relay string
- // FactoryTimeout defines for how long socket factory will be waiting for worker connection. For socket factory only.
- // This config section must not change on re-configuration.
+ // FactoryTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration.
FactoryTimeout time.Duration
// Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
diff --git a/service/registry.go b/service/registry.go
index d4e2ff12..659ece8b 100644
--- a/service/registry.go
+++ b/service/registry.go
@@ -34,7 +34,7 @@ type Registry interface {
// Serve all configured services. Non blocking.
Serve() error
- // Stop all active services.
+ // Close all active services.
Stop() error
}
@@ -47,7 +47,7 @@ type Service interface {
// Serve serves Service.
Serve() error
- // Stop stop Service Service.
+ // Close stop Service Service.
Stop() error
}
@@ -156,7 +156,7 @@ func (r *registry) Serve() error {
return nil
}
-// Stop all active services.
+// Close all active services.
func (r *registry) Stop() error {
return nil
}
diff --git a/state.go b/state.go
index d1068ab3..6c27c4c1 100644
--- a/state.go
+++ b/state.go
@@ -28,9 +28,17 @@ const (
StateReady
// StateWorking - working on given payload.
StateWorking
- // StateStopped - process has been terminated
+
+ // StateDestructing process is being destructed.
+ StateDestructing
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ // StateStopped - process has been terminated.
StateStopped
- // StateErrored - error state (can't be used)
+
+ // StateErrored - error state (can't be used).
StateErrored
)
diff --git a/static_pool.go b/static_pool.go
index f28cad9e..b6bb6efa 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -171,7 +171,7 @@ func (p *StaticPool) release(w *Worker) {
}
// replaceWorker replaces dead or expired worker with new instance.
-func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
+func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) error {
go p.destroyWorker(w)
if nw, err := p.createWorker(); err != nil {
@@ -181,9 +181,12 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
// possible situation when major error causes all PHP scripts to die (for example dead DB)
p.throw(EventPoolError, fmt.Errorf("all workers are dead"))
}
+ return err
} else {
p.free <- nw
}
+
+ return nil
}
// destroyWorker destroys workers and removes it from the pool.
@@ -226,8 +229,19 @@ func (p *StaticPool) createWorker() (*Worker, error) {
p.throw(EventWorkerCreate, w)
go func(w *Worker) {
- if err := w.Wait(); err != nil {
- p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ err := w.Wait()
+
+ // worker have died unexpectedly,
+ // pool should attempt to replace it with alive version safely
+ if w.state.Value() != StateStopped {
+ if err != nil {
+ p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ }
+
+ // attempting to replace worker
+ if err := p.replaceWorker(w, err); err != nil {
+ p.throw(EventPoolError, fmt.Errorf("unable to replace dead worker: %s", err))
+ }
}
}(w)
diff --git a/static_pool_test.go b/static_pool_test.go
index a89cec0a..7e2315d7 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -2,13 +2,13 @@ package roadrunner
import (
"github.com/stretchr/testify/assert"
- "log"
"os/exec"
"runtime"
- "strconv"
- "sync"
"testing"
"time"
+ "strconv"
+ "log"
+ "sync"
)
var cfg = Config{
@@ -162,73 +162,112 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Nil(t, res)
}
-func Test_StaticPool_AllocateTimeout(t *testing.T) {
+func Test_StaticPool_Broken_FromOutside(t *testing.T) {
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- Config{
- NumWorkers: 1,
- AllocateTimeout: time.Millisecond * 50,
- DestroyTimeout: time.Second,
- },
+ cfg,
)
+ defer p.Destroy()
assert.NotNil(t, p)
assert.NoError(t, err)
- done := make(chan interface{})
- go func() {
- _, err := p.Exec(&Payload{Body: []byte("100")})
- assert.NoError(t, err)
- close(done)
- }()
-
- // to ensure that worker is already busy
- time.Sleep(time.Millisecond * 10)
-
- _, err = p.Exec(&Payload{Body: []byte("10")})
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "worker timeout")
-
- <-done
- p.Destroy()
-}
-
-func Test_StaticPool_Replace_Worker(t *testing.T) {
- p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") },
- NewPipeFactory(),
- Config{
- NumWorkers: 1,
- MaxExecutions: 1,
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
- },
- )
- defer p.Destroy()
+ res, err := p.Exec(&Payload{Body: []byte("hello")})
- assert.NotNil(t, p)
assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
- var lastPID string
- lastPID = strconv.Itoa(*p.Workers()[0].Pid)
-
- res, err := p.Exec(&Payload{Body: []byte("hello")})
- assert.Equal(t, lastPID, string(res.Body))
+ assert.Equal(t, "hello", res.String())
+ assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
- for i := 0; i < 10; i++ {
- res, err := p.Exec(&Payload{Body: []byte("hello")})
+ destructed := make(chan interface{})
+ p.Observe(func(e int, ctx interface{}) {
+ if err, ok := ctx.(error); ok {
+ assert.Contains(t, err.Error(), "exit status 1")
+ close(destructed)
+ }
+ })
- assert.NoError(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ // killing random worker and expecting pool to replace it
+ p.workers[0].cmd.Process.Kill()
+ <-destructed
- assert.NotEqual(t, lastPID, string(res.Body))
- lastPID = string(res.Body)
+ for _, w := range p.Workers() {
+ assert.Equal(t, StateReady, w.state.Value())
}
}
+//
+//func Test_StaticPool_AllocateTimeout(t *testing.T) {
+// p, err := NewPool(
+// func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") },
+// NewPipeFactory(),
+// Config{
+// NumWorkers: 1,
+// AllocateTimeout: time.Millisecond * 50,
+// DestroyTimeout: time.Second,
+// },
+// )
+//
+// assert.NotNil(t, p)
+// assert.NoError(t, err)
+//
+// done := make(chan interface{})
+// go func() {
+// _, err := p.Exec(&Payload{Body: []byte("100")})
+// assert.NoError(t, err)
+// close(done)
+// }()
+//
+// // to ensure that worker is already busy
+// time.Sleep(time.Millisecond * 10)
+//
+// _, err = p.Exec(&Payload{Body: []byte("10")})
+// assert.Error(t, err)
+// assert.Contains(t, err.Error(), "worker timeout")
+//
+// <-done
+// p.Destroy()
+//}
+//
+//func Test_StaticPool_Replace_Worker(t *testing.T) {
+// p, err := NewPool(
+// func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") },
+// NewPipeFactory(),
+// Config{
+// NumWorkers: 1,
+// MaxExecutions: 1,
+// AllocateTimeout: time.Second,
+// DestroyTimeout: time.Second,
+// },
+// )
+// defer p.Destroy()
+//
+// assert.NotNil(t, p)
+// assert.NoError(t, err)
+//
+// var lastPID string
+// lastPID = strconv.Itoa(*p.Workers()[0].Pid)
+//
+// res, err := p.Exec(&Payload{Body: []byte("hello")})
+// assert.Equal(t, lastPID, string(res.Body))
+//
+// for i := 0; i < 10; i++ {
+// res, err := p.Exec(&Payload{Body: []byte("hello")})
+//
+// assert.NoError(t, err)
+// assert.NotNil(t, res)
+// assert.NotNil(t, res.Body)
+// assert.Nil(t, res.Context)
+//
+// assert.NotEqual(t, lastPID, string(res.Body))
+// lastPID = string(res.Body)
+// }
+//}
+
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
p, err := NewPool(
diff --git a/worker.go b/worker.go
index 6bd317d8..ab0af052 100644
--- a/worker.go
+++ b/worker.go
@@ -110,9 +110,16 @@ func (w *Worker) Wait() error {
}
if w.endState.Success() {
+ w.state.set(StateStopped)
return nil
}
+ if w.state.Value() != StateDestructing {
+ w.state.set(StateErrored)
+ } else {
+ w.state.set(StateStopped)
+ }
+
if w.err.Len() != 0 {
return errors.New(w.err.String())
}
@@ -123,6 +130,17 @@ func (w *Worker) Wait() error {
// Stop sends soft termination command to the worker and waits for process completion.
func (w *Worker) Stop() error {
+ return w.doStop(StateStopping)
+}
+
+// Destroy is identical to stop command but does mark workers with different state. Destroyed workers won't
+// throw error state on completion of process destruction (exit status).
+func (w *Worker) Destroy() error {
+ return w.doStop(StateDestructing)
+}
+
+// actual stopping.
+func (w *Worker) doStop(state int64) error {
select {
case <-w.waitDone:
return nil
@@ -130,7 +148,7 @@ func (w *Worker) Stop() error {
w.mu.Lock()
defer w.mu.Unlock()
- w.state.set(StateInactive)
+ w.state.set(state)
err := sendPayload(w.rl, &stopCommand{Stop: true})
<-w.waitDone
@@ -145,7 +163,7 @@ func (w *Worker) Kill() error {
case <-w.waitDone:
return nil
default:
- w.state.set(StateInactive)
+ w.state.set(StateDestructing)
err := w.cmd.Process.Signal(os.Kill)
<-w.waitDone
@@ -172,7 +190,6 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
defer w.state.registerExec()
rsp, err = w.execPayload(rqs)
-
if err != nil {
if _, ok := err.(JobError); !ok {
w.state.set(StateErrored)
@@ -196,9 +213,7 @@ func (w *Worker) start() error {
go func() {
w.endState, _ = w.cmd.Process.Wait()
if w.waitDone != nil {
- w.state.set(StateStopped)
close(w.waitDone)
-
if w.rl != nil {
w.mu.Lock()
defer w.mu.Unlock()