diff options
author | Wolfy-J <[email protected]> | 2018-06-10 17:53:13 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-10 17:53:13 +0300 |
commit | 4fd4c7a1e8194287249fa59252afc2cd260d5643 (patch) | |
tree | 7f2f3872ff13ff063acca14d4294b4d299c3dea2 | |
parent | 094a4c211022b9446ef988c74c546ad6efb09722 (diff) |
rr is working now
-rw-r--r-- | cmd/rr/.rr.yaml | 18 | ||||
-rw-r--r-- | cmd/rr/cmd/root.go | 5 | ||||
-rw-r--r-- | cmd/rr/cmd/serve.go | 2 | ||||
-rw-r--r-- | cmd/rr/main.go | 6 | ||||
-rw-r--r-- | config.go | 6 | ||||
-rw-r--r-- | http/service.go | 12 | ||||
-rw-r--r-- | server.go | 92 | ||||
-rw-r--r-- | service/container.go | 19 | ||||
-rw-r--r-- | static/service.go | 22 | ||||
-rw-r--r-- | static_pool.go | 2 |
10 files changed, 79 insertions, 105 deletions
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml index b7da7fac..34475da7 100644 --- a/cmd/rr/.rr.yaml +++ b/cmd/rr/.rr.yaml @@ -25,7 +25,7 @@ http: # http worker pool configuration. workers: # php worker command. - command: "php /Users/wolfy-j/Projects/phpapp/webroot/index.php rr pipes --no-ansi" + command: "php c:/GoProj/phpapp/webroot/index.php rr pipes --no-ansi" # connection method (pipes, tcp://:9000, unix://socket.unix). relay: "pipes" @@ -33,26 +33,24 @@ http: # worker pool configuration. pool: # number of workers to be serving. - numWorkers: 1 + numWorkers: 2 # maximum jobs per worker, 0 - unlimited. maxJobs: 0 - # worker allocation timeouts. - timeouts: - # for how long worker is allowed to be bootstrapped. - allocateTimeout: 6000000 + # for how long worker is allowed to be bootstrapped. + allocateTimeout: 6000000 - # amount of time given to worker to gracefully destruct itself. - destroyTimeout: 6000000 + # amount of time given to worker to gracefully destruct itself. + destroyTimeout: 6000000 # static file serving. static: # serve http static files enable: true - # root directory for static file (http would not serve .php and .htacess files). - dir: "/Users/wolfy-j/Projects/phpapp/webroot" + # root directory for static file (http would not serve .php and .htaccess files). + dir: "c:/GoProj/phpapp/webroot" # list of extensions for forbid for serving. forbid: [".php", ".htaccess"]
\ No newline at end of file diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go index ca83164e..ea7c8a3b 100644 --- a/cmd/rr/cmd/root.go +++ b/cmd/rr/cmd/root.go @@ -53,7 +53,7 @@ var ( // This is called by main.main(). It only needs to happen once to the CLI. func Execute() { if err := CLI.Execute(); err != nil { - utils.Printf("Error: <red>%s</reset>\n", err) + utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err) os.Exit(1) } } @@ -69,7 +69,8 @@ func init() { if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil { if err := Container.Configure(cfg); err != nil { - panic(err) + utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err) + os.Exit(1) } } }) diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go index b04ea4b7..334771ae 100644 --- a/cmd/rr/cmd/serve.go +++ b/cmd/rr/cmd/serve.go @@ -46,4 +46,6 @@ func serveHandler(cmd *cobra.Command, args []string) error { <-stopSignal Container.Stop() + + return nil } diff --git a/cmd/rr/main.go b/cmd/rr/main.go index 3d15924b..ed47b2ed 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -36,13 +36,13 @@ import ( func main() { // provides ability to make local connection to services - rr.Container.Register(rpc.Name, new(rpc.Service)) + rr.Container.Register(rpc.Name, &rpc.Service{}) // http serving - rr.Container.Register(http.Name, new(http.Service)) + rr.Container.Register(http.Name, &http.Service{}) // serving static files - rr.Container.Register(static.Name, new(static.Service)) + rr.Container.Register(static.Name, &static.Service{}) // you can register additional commands using cmd.CLI rr.Execute() @@ -28,15 +28,15 @@ type Config struct { // Reconfigure returns error if cfg not valid func (cfg *Config) Valid() error { if cfg.NumWorkers == 0 { - return fmt.Errorf("cfg.NumWorkers must be set") + return fmt.Errorf("pool.NumWorkers must be set") } if cfg.AllocateTimeout == 0 { - return fmt.Errorf("cfg.AllocateTimeout must be set") + return fmt.Errorf("pool.AllocateTimeout must be set") } if cfg.DestroyTimeout == 0 { - return fmt.Errorf("cfg.DestroyTimeout must be set") + return fmt.Errorf("pool.DestroyTimeout must be set") } return nil diff --git a/http/service.go b/http/service.go index bf25667d..dc996147 100644 --- a/http/service.go +++ b/http/service.go @@ -51,16 +51,15 @@ func (s *Service) Configure(cfg service.Config, c service.Container) (bool, erro } s.cfg = config + + // todo: RPC + return true, nil } // Serve serves the service. func (s *Service) Serve() error { rr := roadrunner.NewServer(s.cfg.Workers) - if err := rr.Start(); err != nil { - return err - } - defer s.rr.Stop() s.rr = rr s.srv = &Server{cfg: s.cfg, rr: s.rr} @@ -75,6 +74,11 @@ func (s *Service) Serve() error { s.http.Handler = s } + if err := rr.Start(); err != nil { + return err + } + defer s.rr.Stop() + if err := s.http.ListenAndServe(); err != nil { return err } @@ -54,52 +54,6 @@ func (srv *Server) Listen(l func(event int, ctx interface{})) { srv.listener = l } -// Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory -// and relay settings. -func (srv *Server) Reconfigure(cfg *ServerConfig) error { - srv.mu.Lock() - if !srv.started { - srv.cfg = cfg - srv.mu.Unlock() - return nil - } - srv.mu.Unlock() - - if srv.cfg.Differs(cfg) { - return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") - } - - srv.mu.Lock() - previous := srv.pool - srv.mu.Unlock() - - pool, err := NewPool(cfg.makeCommand(), srv.factory, cfg.Pool) - if err != nil { - return err - } - - srv.mu.Lock() - srv.cfg.Pool, srv.pool = cfg.Pool, pool - srv.pool.Listen(srv.poolListener) - srv.mu.Unlock() - - srv.throw(EventPoolConstruct, pool) - - if previous != nil { - go func(previous Pool) { - srv.throw(EventPoolDestruct, previous) - previous.Destroy() - }(previous) - } - - return nil -} - -// Reset resets the state of underlying pool and rebuilds all of it's workers. -func (srv *Server) Reset() error { - return srv.Reconfigure(srv.cfg) -} - // Start underlying worker pool, configure factory and command provider. func (srv *Server) Start() (err error) { srv.mu.Lock() @@ -151,6 +105,52 @@ func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) { return pool.Exec(rqs) } +// Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory +// and relay settings. +func (srv *Server) Reconfigure(cfg *ServerConfig) error { + srv.mu.Lock() + if !srv.started { + srv.cfg = cfg + srv.mu.Unlock() + return nil + } + srv.mu.Unlock() + + if srv.cfg.Differs(cfg) { + return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") + } + + srv.mu.Lock() + previous := srv.pool + srv.mu.Unlock() + + pool, err := NewPool(cfg.makeCommand(), srv.factory, cfg.Pool) + if err != nil { + return err + } + + srv.mu.Lock() + srv.cfg.Pool, srv.pool = cfg.Pool, pool + srv.pool.Listen(srv.poolListener) + srv.mu.Unlock() + + srv.throw(EventPoolConstruct, pool) + + if previous != nil { + go func(previous Pool) { + srv.throw(EventPoolDestruct, previous) + previous.Destroy() + }(previous) + } + + return nil +} + +// Reset resets the state of underlying pool and rebuilds all of it's workers. +func (srv *Server) Reset() error { + return srv.Reconfigure(srv.cfg) +} + // Workers returns worker list associated with the server pool. func (srv *Server) Workers() (workers []*Worker) { p := srv.Pool() diff --git a/service/container.go b/service/container.go index 0e89f224..c47e0fd2 100644 --- a/service/container.go +++ b/service/container.go @@ -65,7 +65,7 @@ func (c *container) Register(name string, service Service) { status: StatusRegistered, }) - c.log.Debugf("%s: registered", color.GreenString(name)) + c.log.Debugf("%s: registered", color.YellowString(name)) } // Check hasStatus svc has been registered. @@ -98,23 +98,20 @@ func (c *container) Get(target string) (svc Service, status int) { // Configure configures all underlying services with given configuration. func (c *container) Configure(cfg Config) error { - c.mu.Lock() - defer c.mu.Unlock() - for _, e := range c.services { if e.getStatus() >= StatusConfigured { - return fmt.Errorf("service %s has already been configured", color.GreenString(e.name)) + return fmt.Errorf("service %s has already been configured", color.RedString(e.name)) } segment := cfg.Get(e.name) if segment == nil { - c.log.Debugf("%s: no config has been provided", color.GreenString(e.name)) + c.log.Debugf("%s: no config has been provided", color.YellowString(e.name)) continue } ok, err := e.svc.Configure(segment, c) if err != nil { - return errors.Wrap(err, fmt.Sprintf("%s", color.GreenString(e.name))) + return errors.Wrap(err, fmt.Sprintf("%s", color.RedString(e.name))) } else if ok { e.setStatus(StatusConfigured) } @@ -131,7 +128,6 @@ func (c *container) Serve() error { ) defer close(done) - c.mu.Lock() for _, e := range c.services { if e.hasStatus(StatusConfigured) { numServing ++ @@ -145,12 +141,10 @@ func (c *container) Serve() error { defer e.setStatus(StatusStopped) if err := e.svc.Serve(); err != nil { - c.log.Errorf("%s: %s", color.GreenString(e.name), err) - done <- errors.Wrap(err, fmt.Sprintf("%s", color.GreenString(e.name))) + done <- errors.Wrap(err, fmt.Sprintf("%s", color.RedString(e.name))) } }(e) } - c.mu.Unlock() for i := 0; i < numServing; i++ { result := <-done @@ -167,9 +161,6 @@ func (c *container) Serve() error { // Stop sends stop command to all running services. func (c *container) Stop() { - c.mu.Lock() - defer c.mu.Unlock() - for _, e := range c.services { if e.hasStatus(StatusServing) { e.svc.Stop() diff --git a/static/service.go b/static/service.go index 6be7f20f..03a18ba8 100644 --- a/static/service.go +++ b/static/service.go @@ -1,9 +1,7 @@ package static import ( - "github.com/sirupsen/logrus" "net/http" - "os" "path" "strings" rrttp "github.com/spiral/roadrunner/http" @@ -15,9 +13,6 @@ const Name = "static" // Service serves static files. Potentially convert into middleware? type Service struct { - // Logger is associated debug and error logger. Can be empty. - Logger *logrus.Logger - // server configuration (location, forbidden files and etc) cfg *Config @@ -52,10 +47,6 @@ func (s *Service) Configure(cfg service.Config, c service.Container) (enabled bo if h, ok := h.(*rrttp.Service); ok { h.Add(s) } - } else { - if s.Logger != nil { - s.Logger.Warningf("no http service found") - } } return true, nil @@ -71,7 +62,6 @@ func (s *Service) Serve() error { // Stop stops the service. func (s *Service) Stop() { - //todo: this is not safe (TODO CHECK IT?) close(s.done) } @@ -84,29 +74,17 @@ func (s *Service) Handle(w http.ResponseWriter, r *http.Request) bool { fPath = path.Clean(fPath) if s.cfg.Forbids(fPath) { - if s.Logger != nil { - s.Logger.Warningf("attempt to access forbidden file %s", fPath) - } return false } f, err := s.root.Open(fPath) if err != nil { - if !os.IsNotExist(err) { - if s.Logger != nil { - s.Logger.Error(err) - } - } - return false } defer f.Close() d, err := f.Stat() if err != nil { - if s.Logger != nil { - s.Logger.Error(err) - } return false } diff --git a/static_pool.go b/static_pool.go index a2d30679..887904b8 100644 --- a/static_pool.go +++ b/static_pool.go @@ -43,7 +43,7 @@ type StaticPool struct { // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) { if err := cfg.Valid(); err != nil { - return nil, errors.Wrap(err, "config error") + return nil, errors.Wrap(err, "config") } p := &StaticPool{ |