diff options
author | Wolfy-J <[email protected]> | 2018-06-05 22:48:27 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-05 22:48:27 +0300 |
commit | 6adaf713b47c9a3ab3a516e21d2d4ecf7f2075d6 (patch) | |
tree | 6bcf1bfea1e2f87a3ae7065612c0df43c90c1cdc | |
parent | 3112f9b58c73773cea972fd79f04d33f8f7d7edd (diff) |
breaking the tests
-rw-r--r-- | cmd/_____/bus.go | 94 | ||||
-rw-r--r-- | cmd/_____/http/static.go | 4 | ||||
-rw-r--r-- | cmd/rr/http/reload.go | 2 | ||||
-rw-r--r-- | cmd/rr/http/workers.go | 2 | ||||
-rw-r--r-- | ext/config_test.go (renamed from config_test.go) | 2 | ||||
-rw-r--r-- | ext/errors_test.go (renamed from errors_test.go) | 2 | ||||
-rw-r--r-- | ext/pipe_factory_test.go (renamed from pipe_factory_test.go) | 2 | ||||
-rw-r--r-- | ext/protocol_test.go (renamed from protocol_test.go) | 2 | ||||
-rw-r--r-- | ext/socket_factory_test.go (renamed from socket_factory_test.go) | 2 | ||||
-rw-r--r-- | factory.go | 3 | ||||
-rw-r--r-- | http/data.go (renamed from cmd/_____/http/data.go) | 0 | ||||
-rw-r--r-- | http/request.go (renamed from cmd/_____/http/request.go) | 0 | ||||
-rw-r--r-- | http/response.go (renamed from cmd/_____/http/response.go) | 0 | ||||
-rw-r--r-- | http/server.go | 1 | ||||
-rw-r--r-- | http/uploads.go (renamed from cmd/_____/http/uploads.go) | 23 | ||||
-rw-r--r-- | pipe_factory.go | 5 | ||||
-rw-r--r-- | rpc/service.go | 2 | ||||
-rw-r--r-- | server.go | 30 | ||||
-rw-r--r-- | server_config.go | 12 | ||||
-rw-r--r-- | service/registry.go | 6 | ||||
-rw-r--r-- | state.go | 12 | ||||
-rw-r--r-- | static_pool.go | 20 | ||||
-rw-r--r-- | static_pool_test.go | 147 | ||||
-rw-r--r-- | worker.go | 25 |
24 files changed, 200 insertions, 198 deletions
diff --git a/cmd/_____/bus.go b/cmd/_____/bus.go deleted file mode 100644 index 813a6c3b..00000000 --- a/cmd/_____/bus.go +++ /dev/null @@ -1,94 +0,0 @@ -package _____ - -import ( - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "net/rpc" - "sync" -) - -// Config provides ability to slice configuration sections and unmarshal configuration data into -// given structure. -type Config interface { - // Get nested config section (sub-map), returns nil if section not found. - Get(service string) Config - - // Unmarshal unmarshal config data into given struct. - Unmarshal(out interface{}) error -} - -var ( - dsnError = errors.New("invalid socket DSN (tcp://:6001, unix://sock.unix)") -) - -type Bus struct { - services []Service - wg sync.WaitGroup - enabled []Service - stop chan interface{} - rpc *rpc.Server - rpcConfig *RPCConfig -} - -func (b *Bus) Register(s Service) { - b.services = append(b.services, s) -} - -func (b *Bus) Services() []Service { - return b.services -} - -func (b *Bus) Configure(cfg Config) error { - b.enabled = make([]Service, 0) - - for _, s := range b.services { - segment := cfg.Get(s.Name()) - if segment == nil { - // no config has been provided for the Service - logrus.Debugf("%s: no config has been provided", s.Name()) - continue - } - - if enable, err := s.Configure(segment); err != nil { - return err - } else if !enable { - continue - } - - b.enabled = append(b.enabled, s) - } - - return nil -} - -func (b *Bus) Serve() { - b.rpc = rpc.NewServer() - - for _, s := range b.enabled { - // some candidates might provide net/rpc api for internal communications - if api := s.RPC(); api != nil { - b.rpc.RegisterName(s.Name(), api) - } - - b.wg.Add(1) - go func() { - defer b.wg.Done() - - if err := s.Serve(); err != nil { - logrus.Errorf("%s.start: %s", s.Name(), err) - } - }() - } - - b.wg.Wait() -} - -func (b *Bus) Stop() { - for _, s := range b.enabled { - if err := s.Stop(); err != nil { - logrus.Errorf("%s.stop: %s", s.Name(), err) - } - } - - b.wg.Wait() -} diff --git a/cmd/_____/http/static.go b/cmd/_____/http/static.go index d7030c3f..b055099f 100644 --- a/cmd/_____/http/static.go +++ b/cmd/_____/http/static.go @@ -9,9 +9,7 @@ import ( "strings" ) -var ( - forbiddenFiles = []string{".php", ".htaccess"} -) +var forbiddenFiles = []string{".php", ".htaccess"} // staticServer serves static files type staticServer struct { diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reload.go index 0fd3d7e9..23c36144 100644 --- a/cmd/rr/http/reload.go +++ b/cmd/rr/http/reload.go @@ -21,9 +21,9 @@ package http import ( + "errors" "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/go-errors/errors" "github.com/spiral/roadrunner/rpc" ) diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go index 63ef0cce..61fee2a1 100644 --- a/cmd/rr/http/workers.go +++ b/cmd/rr/http/workers.go @@ -21,9 +21,9 @@ package http import ( + "errors" "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "errors" "github.com/spiral/roadrunner/rpc" ) diff --git a/config_test.go b/ext/config_test.go index f4c6246d..fbdde223 100644 --- a/config_test.go +++ b/ext/config_test.go @@ -1,4 +1,4 @@ -package roadrunner +package ext import ( "github.com/stretchr/testify/assert" diff --git a/errors_test.go b/ext/errors_test.go index 9b0fa53e..7c9d7a5b 100644 --- a/errors_test.go +++ b/ext/errors_test.go @@ -1,4 +1,4 @@ -package roadrunner +package ext import ( "github.com/stretchr/testify/assert" diff --git a/pipe_factory_test.go b/ext/pipe_factory_test.go index ae276ab6..434c31b0 100644 --- a/pipe_factory_test.go +++ b/ext/pipe_factory_test.go @@ -1,4 +1,4 @@ -package roadrunner +package ext import ( "github.com/stretchr/testify/assert" diff --git a/protocol_test.go b/ext/protocol_test.go index ed3fe461..f6410ef5 100644 --- a/protocol_test.go +++ b/ext/protocol_test.go @@ -1,4 +1,4 @@ -package roadrunner +package ext import ( "github.com/pkg/errors" diff --git a/socket_factory_test.go b/ext/socket_factory_test.go index f6b1350c..2d6108bc 100644 --- a/socket_factory_test.go +++ b/ext/socket_factory_test.go @@ -1,4 +1,4 @@ -package roadrunner +package ext import ( "github.com/stretchr/testify/assert" @@ -7,4 +7,7 @@ type Factory interface { // SpawnWorker creates new worker process based on given command. // Process must not be started. SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) + + // Close the factory and underlying connections. + Close() error } diff --git a/cmd/_____/http/data.go b/http/data.go index e6b8344f..e6b8344f 100644 --- a/cmd/_____/http/data.go +++ b/http/data.go diff --git a/cmd/_____/http/request.go b/http/request.go index fd483744..fd483744 100644 --- a/cmd/_____/http/request.go +++ b/http/request.go diff --git a/cmd/_____/http/response.go b/http/response.go index 2736c4ab..2736c4ab 100644 --- a/cmd/_____/http/response.go +++ b/http/response.go diff --git a/http/server.go b/http/server.go new file mode 100644 index 00000000..d02cfda6 --- /dev/null +++ b/http/server.go @@ -0,0 +1 @@ +package http diff --git a/cmd/_____/http/uploads.go b/http/uploads.go index 468e8a19..c3b18169 100644 --- a/cmd/_____/http/uploads.go +++ b/http/uploads.go @@ -2,7 +2,6 @@ package http import ( "encoding/json" - "fmt" "io" "io/ioutil" "mime/multipart" @@ -80,15 +79,6 @@ func (f *FileUpload) Open(cfg *Config) error { return err } -func wrapUpload(f *multipart.FileHeader) *FileUpload { - return &FileUpload{ - Name: f.Filename, - MimeType: f.Header.Get("Content-Type"), - Error: UploadErrorOK, - header: f, - } -} - type fileTree map[string]interface{} func (d fileTree) push(k string, v []*FileUpload) { @@ -192,7 +182,16 @@ func parseUploads(r *http.Request) (*Uploads, error) { return u, nil } -// exists if file exists. by osutils; todo: better? +func wrapUpload(f *multipart.FileHeader) *FileUpload { + return &FileUpload{ + Name: f.Filename, + MimeType: f.Header.Get("Content-Type"), + Error: UploadErrorOK, + header: f, + } +} + +// exists if file exists. func exists(path string) bool { _, err := os.Stat(path) if err == nil { @@ -203,5 +202,5 @@ func exists(path string) bool { return false } - panic(fmt.Errorf("unable to stat path %q; %v", path, err)) + return false } diff --git a/pipe_factory.go b/pipe_factory.go index 1ebcc69d..d6fe0420 100644 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -61,3 +61,8 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { w.state.set(StateReady) return w, nil } + +// Close the factory. +func (f *PipeFactory) Close() error { + return nil +} diff --git a/rpc/service.go b/rpc/service.go index 61a9a1a3..b461bdc2 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -62,7 +62,7 @@ func (s *Service) Serve() error { return nil } -// Stop stop Service Service. +// Close stop Service Service. func (s *Service) Stop() error { close(s.stop) return nil @@ -16,14 +16,14 @@ const ( // Service manages pool creation and swapping. type Server struct { - // observes pool events (can be attached to multiple pools at the same time) - observer func(event int, ctx interface{}) - // worker command creator cmd func() *exec.Cmd - // pool behaviour - cfg Config + // defines server wide configuration, behaviour and timeouts. + config ServerConfig + + // observes pool events (can be attached to multiple pools at the same time) + observer func(event int, ctx interface{}) // creates and connects to workers factory Factory @@ -31,6 +31,9 @@ type Server struct { // protects pool while the switch mu sync.Mutex + // pool behaviour + cfg Config + // currently active pool instance pool Pool } @@ -128,12 +131,23 @@ func (r *Server) Destroy() { r.pool = nil } -func (r *Server) Start() { - // ???? +// Start the server underlying worker pool and factory. +func (r *Server) Start() error { + if r.factory != nil { + //todo: already have started + return nil + } + + return nil } +// Stop the server and close underlying factory. func (r *Server) Stop() { - // stop factory? + r.mu.Lock() + defer r.mu.Unlock() + + r.factory.Close() + r.factory = nil } // throw invokes event handler if any. diff --git a/server_config.go b/server_config.go index 2816a70b..7e0dcbe9 100644 --- a/server_config.go +++ b/server_config.go @@ -1,14 +1,14 @@ package roadrunner import ( - "time" - "strings" - "net" "errors" + "net" + "strings" + "time" ) const ( - FactoryPipes = iota + FactoryPipes = iota FactorySocket ) @@ -18,8 +18,8 @@ type ServerConfig struct { // This config section must not change on re-configuration. Relay string - // FactoryTimeout defines for how long socket factory will be waiting for worker connection. For socket factory only. - // This config section must not change on re-configuration. + // FactoryTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. FactoryTimeout time.Duration // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change diff --git a/service/registry.go b/service/registry.go index d4e2ff12..659ece8b 100644 --- a/service/registry.go +++ b/service/registry.go @@ -34,7 +34,7 @@ type Registry interface { // Serve all configured services. Non blocking. Serve() error - // Stop all active services. + // Close all active services. Stop() error } @@ -47,7 +47,7 @@ type Service interface { // Serve serves Service. Serve() error - // Stop stop Service Service. + // Close stop Service Service. Stop() error } @@ -156,7 +156,7 @@ func (r *registry) Serve() error { return nil } -// Stop all active services. +// Close all active services. func (r *registry) Stop() error { return nil } @@ -28,9 +28,17 @@ const ( StateReady // StateWorking - working on given payload. StateWorking - // StateStopped - process has been terminated + + // StateDestructing process is being destructed. + StateDestructing + + // StateStopping - process is being softly stopped. + StateStopping + + // StateStopped - process has been terminated. StateStopped - // StateErrored - error state (can't be used) + + // StateErrored - error state (can't be used). StateErrored ) diff --git a/static_pool.go b/static_pool.go index f28cad9e..b6bb6efa 100644 --- a/static_pool.go +++ b/static_pool.go @@ -171,7 +171,7 @@ func (p *StaticPool) release(w *Worker) { } // replaceWorker replaces dead or expired worker with new instance. -func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { +func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) error { go p.destroyWorker(w) if nw, err := p.createWorker(); err != nil { @@ -181,9 +181,12 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { // possible situation when major error causes all PHP scripts to die (for example dead DB) p.throw(EventPoolError, fmt.Errorf("all workers are dead")) } + return err } else { p.free <- nw } + + return nil } // destroyWorker destroys workers and removes it from the pool. @@ -226,8 +229,19 @@ func (p *StaticPool) createWorker() (*Worker, error) { p.throw(EventWorkerCreate, w) go func(w *Worker) { - if err := w.Wait(); err != nil { - p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) + err := w.Wait() + + // worker have died unexpectedly, + // pool should attempt to replace it with alive version safely + if w.state.Value() != StateStopped { + if err != nil { + p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) + } + + // attempting to replace worker + if err := p.replaceWorker(w, err); err != nil { + p.throw(EventPoolError, fmt.Errorf("unable to replace dead worker: %s", err)) + } } }(w) diff --git a/static_pool_test.go b/static_pool_test.go index a89cec0a..7e2315d7 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -2,13 +2,13 @@ package roadrunner import ( "github.com/stretchr/testify/assert" - "log" "os/exec" "runtime" - "strconv" - "sync" "testing" "time" + "strconv" + "log" + "sync" ) var cfg = Config{ @@ -162,73 +162,112 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Nil(t, res) } -func Test_StaticPool_AllocateTimeout(t *testing.T) { +func Test_StaticPool_Broken_FromOutside(t *testing.T) { p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "echo", "pipes") }, NewPipeFactory(), - Config{ - NumWorkers: 1, - AllocateTimeout: time.Millisecond * 50, - DestroyTimeout: time.Second, - }, + cfg, ) + defer p.Destroy() assert.NotNil(t, p) assert.NoError(t, err) - done := make(chan interface{}) - go func() { - _, err := p.Exec(&Payload{Body: []byte("100")}) - assert.NoError(t, err) - close(done) - }() - - // to ensure that worker is already busy - time.Sleep(time.Millisecond * 10) - - _, err = p.Exec(&Payload{Body: []byte("10")}) - assert.Error(t, err) - assert.Contains(t, err.Error(), "worker timeout") - - <-done - p.Destroy() -} - -func Test_StaticPool_Replace_Worker(t *testing.T) { - p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") }, - NewPipeFactory(), - Config{ - NumWorkers: 1, - MaxExecutions: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - defer p.Destroy() + res, err := p.Exec(&Payload{Body: []byte("hello")}) - assert.NotNil(t, p) assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) - var lastPID string - lastPID = strconv.Itoa(*p.Workers()[0].Pid) - - res, err := p.Exec(&Payload{Body: []byte("hello")}) - assert.Equal(t, lastPID, string(res.Body)) + assert.Equal(t, "hello", res.String()) + assert.Equal(t, runtime.NumCPU(), len(p.Workers())) - for i := 0; i < 10; i++ { - res, err := p.Exec(&Payload{Body: []byte("hello")}) + destructed := make(chan interface{}) + p.Observe(func(e int, ctx interface{}) { + if err, ok := ctx.(error); ok { + assert.Contains(t, err.Error(), "exit status 1") + close(destructed) + } + }) - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + // killing random worker and expecting pool to replace it + p.workers[0].cmd.Process.Kill() + <-destructed - assert.NotEqual(t, lastPID, string(res.Body)) - lastPID = string(res.Body) + for _, w := range p.Workers() { + assert.Equal(t, StateReady, w.state.Value()) } } +// +//func Test_StaticPool_AllocateTimeout(t *testing.T) { +// p, err := NewPool( +// func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") }, +// NewPipeFactory(), +// Config{ +// NumWorkers: 1, +// AllocateTimeout: time.Millisecond * 50, +// DestroyTimeout: time.Second, +// }, +// ) +// +// assert.NotNil(t, p) +// assert.NoError(t, err) +// +// done := make(chan interface{}) +// go func() { +// _, err := p.Exec(&Payload{Body: []byte("100")}) +// assert.NoError(t, err) +// close(done) +// }() +// +// // to ensure that worker is already busy +// time.Sleep(time.Millisecond * 10) +// +// _, err = p.Exec(&Payload{Body: []byte("10")}) +// assert.Error(t, err) +// assert.Contains(t, err.Error(), "worker timeout") +// +// <-done +// p.Destroy() +//} +// +//func Test_StaticPool_Replace_Worker(t *testing.T) { +// p, err := NewPool( +// func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") }, +// NewPipeFactory(), +// Config{ +// NumWorkers: 1, +// MaxExecutions: 1, +// AllocateTimeout: time.Second, +// DestroyTimeout: time.Second, +// }, +// ) +// defer p.Destroy() +// +// assert.NotNil(t, p) +// assert.NoError(t, err) +// +// var lastPID string +// lastPID = strconv.Itoa(*p.Workers()[0].Pid) +// +// res, err := p.Exec(&Payload{Body: []byte("hello")}) +// assert.Equal(t, lastPID, string(res.Body)) +// +// for i := 0; i < 10; i++ { +// res, err := p.Exec(&Payload{Body: []byte("hello")}) +// +// assert.NoError(t, err) +// assert.NotNil(t, res) +// assert.NotNil(t, res.Body) +// assert.Nil(t, res.Context) +// +// assert.NotEqual(t, lastPID, string(res.Body)) +// lastPID = string(res.Body) +// } +//} + // identical to replace but controlled on worker side func Test_StaticPool_Stop_Worker(t *testing.T) { p, err := NewPool( @@ -110,9 +110,16 @@ func (w *Worker) Wait() error { } if w.endState.Success() { + w.state.set(StateStopped) return nil } + if w.state.Value() != StateDestructing { + w.state.set(StateErrored) + } else { + w.state.set(StateStopped) + } + if w.err.Len() != 0 { return errors.New(w.err.String()) } @@ -123,6 +130,17 @@ func (w *Worker) Wait() error { // Stop sends soft termination command to the worker and waits for process completion. func (w *Worker) Stop() error { + return w.doStop(StateStopping) +} + +// Destroy is identical to stop command but does mark workers with different state. Destroyed workers won't +// throw error state on completion of process destruction (exit status). +func (w *Worker) Destroy() error { + return w.doStop(StateDestructing) +} + +// actual stopping. +func (w *Worker) doStop(state int64) error { select { case <-w.waitDone: return nil @@ -130,7 +148,7 @@ func (w *Worker) Stop() error { w.mu.Lock() defer w.mu.Unlock() - w.state.set(StateInactive) + w.state.set(state) err := sendPayload(w.rl, &stopCommand{Stop: true}) <-w.waitDone @@ -145,7 +163,7 @@ func (w *Worker) Kill() error { case <-w.waitDone: return nil default: - w.state.set(StateInactive) + w.state.set(StateDestructing) err := w.cmd.Process.Signal(os.Kill) <-w.waitDone @@ -172,7 +190,6 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { defer w.state.registerExec() rsp, err = w.execPayload(rqs) - if err != nil { if _, ok := err.(JobError); !ok { w.state.set(StateErrored) @@ -196,9 +213,7 @@ func (w *Worker) start() error { go func() { w.endState, _ = w.cmd.Process.Wait() if w.waitDone != nil { - w.state.set(StateStopped) close(w.waitDone) - if w.rl != nil { w.mu.Lock() defer w.mu.Unlock() |