diff options
-rw-r--r-- | cmd/rr/.rr.yaml | 60 | ||||
-rw-r--r-- | http/rpc.go | 16 | ||||
-rw-r--r-- | http/service.go | 16 | ||||
-rw-r--r-- | pipe_factory.go | 7 | ||||
-rw-r--r-- | service/bus.go | 4 | ||||
-rw-r--r-- | socket_factory.go | 6 | ||||
-rw-r--r-- | static_pool.go | 2 |
7 files changed, 105 insertions, 6 deletions
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml new file mode 100644 index 00000000..4cc54ae9 --- /dev/null +++ b/cmd/rr/.rr.yaml @@ -0,0 +1,60 @@ +# rpc bus allows php application and external clients to talk to rr services. +rpc: + # rpc connection DSN. Supported TCP and Unix sockets. + listen: tcp://127.0.0.1:6001 + +# http service configuration. +http: + # set to false to disable http server. + enabled: true + + # http host to listen. + host: 0.0.0.0 + + # http port. + port: 8080 + + # max POST request size, including file uploads. + maxRequest: 1G + + # static file serving. + static: + # when true rr http will be serving static files. + serve: true + + # root directory for static file (http would not serve .php and .htacess files). + root: "c:/goproj/phpapp/webroot" + + # file upload configuration. + uploads: + # directory to store uploaded files before passing to PHP, keep empty to use default system + # temp directory. + tmpDir: + + # list of file extensions which are forbidden for uploading. + forbid: [".php", ".exe", ".bat"] + + # http worker pool configuration. + pool: + # php worker command. + command: "php c:/goproj/phpapp/webroot/index.php rr pipes --no-ansi" + + # connection method (pipes, tcp://:9000, unix://socket.unix). + relay: "pipes" + + # number of active workers. + number: 1 + + # maximum jobs per worker, 0 - unlimited. + maxJobs: 0 + + # worker allocation timeouts. + timeouts: + # for how long socket based relay should await worker connection. + connect: 10 + + # for how long worker is allowed to be bootstrapped. + allocate: 60 + + # amount of time given to worker to gracefully destruct itself. + destroy: 60
\ No newline at end of file diff --git a/http/rpc.go b/http/rpc.go index dcf19b1f..38db9a61 100644 --- a/http/rpc.go +++ b/http/rpc.go @@ -3,6 +3,7 @@ package http import ( "github.com/sirupsen/logrus" "github.com/spiral/roadrunner/utils" + "github.com/pkg/errors" ) type rpcServer struct { @@ -17,14 +18,27 @@ type WorkerList struct { // Reset resets underlying RR worker pool and restarts all of it's workers. func (rpc *rpcServer) Reset(reset bool, r *string) error { + if rpc.service.srv == nil { + return errors.New("no http server") + } + logrus.Info("http: restarting worker pool") *r = "OK" - return rpc.service.srv.rr.Reset() + err := rpc.service.srv.rr.Reset() + if err != nil { + logrus.Errorf("http: %s", err) + } + + return err } // Workers returns list of active workers and their stats. func (rpc *rpcServer) Workers(list bool, r *WorkerList) error { + if rpc.service.srv == nil { + return errors.New("no http server") + } + r.Workers = utils.FetchWorkers(rpc.service.srv.rr) return nil } diff --git a/http/service.go b/http/service.go index 554b79e3..f8e05f4d 100644 --- a/http/service.go +++ b/http/service.go @@ -5,6 +5,7 @@ import ( "github.com/sirupsen/logrus" "github.com/spiral/roadrunner/service" "net/http" + "github.com/spiral/roadrunner" ) type Service struct { @@ -49,6 +50,21 @@ func (s *Service) Serve() error { } defer term() + rr.Observe(func(event int, ctx interface{}) { + switch event { + case roadrunner.EventPoolError: + logrus.Error(ctx) + case roadrunner.EventWorkerCreate: + logrus.Infof("%s - created", ctx) + case roadrunner.EventWorkerError: + logrus.Errorf("%s: %s", ctx.(roadrunner.WorkerError).Worker, ctx.(roadrunner.WorkerError).Error()) + case roadrunner.EventWorkerDestruct: + logrus.Warnf("%s - destructed", ctx) + case roadrunner.EventWorkerKill: + logrus.Warnf("%s - killed", ctx) + } + }) + s.srv = NewServer(s.cfg.httpConfig(), rr) s.http = &http.Server{ Addr: s.cfg.httpAddr(), diff --git a/pipe_factory.go b/pipe_factory.go index 3b2f2f88..1ebcc69d 100644 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -46,8 +46,13 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { if pid, err := fetchPID(w.rl); pid != *w.Pid { go func(w *Worker) { w.Kill() }(w) + if wErr := w.Wait(); wErr != nil { - err = errors.Wrap(wErr, err.Error()) + if _, ok := wErr.(*exec.ExitError); ok { + err = errors.Wrap(wErr, err.Error()) + } else { + err = wErr + } } return nil, errors.Wrap(err, "unable to connect to worker") diff --git a/service/bus.go b/service/bus.go index 66a0847c..8bfb914c 100644 --- a/service/bus.go +++ b/service/bus.go @@ -92,7 +92,7 @@ func (b *Bus) Serve() { defer b.wg.Done() if err := s.Serve(); err != nil { - logrus.Errorf("%s.start: ", s.Name(), err) + logrus.Errorf("%s.start: %s", s.Name(), err) } }() } @@ -117,7 +117,7 @@ func (b *Bus) Stop() { for _, s := range b.enabled { if err := s.Stop(); err != nil { - logrus.Errorf("%s.stop: ", s.Name(), err) + logrus.Errorf("%s.stop: %s", s.Name(), err) } } diff --git a/socket_factory.go b/socket_factory.go index acdc91b1..c7fe639d 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -54,7 +54,11 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { go func(w *Worker) { w.Kill() }(w) if wErr := w.Wait(); wErr != nil { - err = errors.Wrap(wErr, err.Error()) + if _, ok := wErr.(*exec.ExitError); ok { + err = errors.Wrap(wErr, err.Error()) + } else { + err = wErr + } } return nil, errors.Wrap(err, "unable to connect to worker") diff --git a/static_pool.go b/static_pool.go index 0527d024..f28cad9e 100644 --- a/static_pool.go +++ b/static_pool.go @@ -43,7 +43,7 @@ type StaticPool struct { // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) { if err := cfg.Valid(); err != nil { - return nil, errors.Wrap(err, "cfg error") + return nil, errors.Wrap(err, "config error") } p := &StaticPool{ |