summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-10 17:53:13 +0300
committerWolfy-J <[email protected]>2018-06-10 17:53:13 +0300
commit4fd4c7a1e8194287249fa59252afc2cd260d5643 (patch)
tree7f2f3872ff13ff063acca14d4294b4d299c3dea2
parent094a4c211022b9446ef988c74c546ad6efb09722 (diff)
rr is working now
-rw-r--r--cmd/rr/.rr.yaml18
-rw-r--r--cmd/rr/cmd/root.go5
-rw-r--r--cmd/rr/cmd/serve.go2
-rw-r--r--cmd/rr/main.go6
-rw-r--r--config.go6
-rw-r--r--http/service.go12
-rw-r--r--server.go92
-rw-r--r--service/container.go19
-rw-r--r--static/service.go22
-rw-r--r--static_pool.go2
10 files changed, 79 insertions, 105 deletions
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
index b7da7fac..34475da7 100644
--- a/cmd/rr/.rr.yaml
+++ b/cmd/rr/.rr.yaml
@@ -25,7 +25,7 @@ http:
# http worker pool configuration.
workers:
# php worker command.
- command: "php /Users/wolfy-j/Projects/phpapp/webroot/index.php rr pipes --no-ansi"
+ command: "php c:/GoProj/phpapp/webroot/index.php rr pipes --no-ansi"
# connection method (pipes, tcp://:9000, unix://socket.unix).
relay: "pipes"
@@ -33,26 +33,24 @@ http:
# worker pool configuration.
pool:
# number of workers to be serving.
- numWorkers: 1
+ numWorkers: 2
# maximum jobs per worker, 0 - unlimited.
maxJobs: 0
- # worker allocation timeouts.
- timeouts:
- # for how long worker is allowed to be bootstrapped.
- allocateTimeout: 6000000
+ # for how long worker is allowed to be bootstrapped.
+ allocateTimeout: 6000000
- # amount of time given to worker to gracefully destruct itself.
- destroyTimeout: 6000000
+ # amount of time given to worker to gracefully destruct itself.
+ destroyTimeout: 6000000
# static file serving.
static:
# serve http static files
enable: true
- # root directory for static file (http would not serve .php and .htacess files).
- dir: "/Users/wolfy-j/Projects/phpapp/webroot"
+ # root directory for static file (http would not serve .php and .htaccess files).
+ dir: "c:/GoProj/phpapp/webroot"
# list of extensions for forbid for serving.
forbid: [".php", ".htaccess"] \ No newline at end of file
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
index ca83164e..ea7c8a3b 100644
--- a/cmd/rr/cmd/root.go
+++ b/cmd/rr/cmd/root.go
@@ -53,7 +53,7 @@ var (
// This is called by main.main(). It only needs to happen once to the CLI.
func Execute() {
if err := CLI.Execute(); err != nil {
- utils.Printf("Error: <red>%s</reset>\n", err)
+ utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
os.Exit(1)
}
}
@@ -69,7 +69,8 @@ func init() {
if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil {
if err := Container.Configure(cfg); err != nil {
- panic(err)
+ utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
+ os.Exit(1)
}
}
})
diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go
index b04ea4b7..334771ae 100644
--- a/cmd/rr/cmd/serve.go
+++ b/cmd/rr/cmd/serve.go
@@ -46,4 +46,6 @@ func serveHandler(cmd *cobra.Command, args []string) error {
<-stopSignal
Container.Stop()
+
+ return nil
}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 3d15924b..ed47b2ed 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -36,13 +36,13 @@ import (
func main() {
// provides ability to make local connection to services
- rr.Container.Register(rpc.Name, new(rpc.Service))
+ rr.Container.Register(rpc.Name, &rpc.Service{})
// http serving
- rr.Container.Register(http.Name, new(http.Service))
+ rr.Container.Register(http.Name, &http.Service{})
// serving static files
- rr.Container.Register(static.Name, new(static.Service))
+ rr.Container.Register(static.Name, &static.Service{})
// you can register additional commands using cmd.CLI
rr.Execute()
diff --git a/config.go b/config.go
index a20f778b..8fe8dadf 100644
--- a/config.go
+++ b/config.go
@@ -28,15 +28,15 @@ type Config struct {
// Reconfigure returns error if cfg not valid
func (cfg *Config) Valid() error {
if cfg.NumWorkers == 0 {
- return fmt.Errorf("cfg.NumWorkers must be set")
+ return fmt.Errorf("pool.NumWorkers must be set")
}
if cfg.AllocateTimeout == 0 {
- return fmt.Errorf("cfg.AllocateTimeout must be set")
+ return fmt.Errorf("pool.AllocateTimeout must be set")
}
if cfg.DestroyTimeout == 0 {
- return fmt.Errorf("cfg.DestroyTimeout must be set")
+ return fmt.Errorf("pool.DestroyTimeout must be set")
}
return nil
diff --git a/http/service.go b/http/service.go
index bf25667d..dc996147 100644
--- a/http/service.go
+++ b/http/service.go
@@ -51,16 +51,15 @@ func (s *Service) Configure(cfg service.Config, c service.Container) (bool, erro
}
s.cfg = config
+
+ // todo: RPC
+
return true, nil
}
// Serve serves the service.
func (s *Service) Serve() error {
rr := roadrunner.NewServer(s.cfg.Workers)
- if err := rr.Start(); err != nil {
- return err
- }
- defer s.rr.Stop()
s.rr = rr
s.srv = &Server{cfg: s.cfg, rr: s.rr}
@@ -75,6 +74,11 @@ func (s *Service) Serve() error {
s.http.Handler = s
}
+ if err := rr.Start(); err != nil {
+ return err
+ }
+ defer s.rr.Stop()
+
if err := s.http.ListenAndServe(); err != nil {
return err
}
diff --git a/server.go b/server.go
index 84dedb52..6ee2a170 100644
--- a/server.go
+++ b/server.go
@@ -54,52 +54,6 @@ func (srv *Server) Listen(l func(event int, ctx interface{})) {
srv.listener = l
}
-// Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory
-// and relay settings.
-func (srv *Server) Reconfigure(cfg *ServerConfig) error {
- srv.mu.Lock()
- if !srv.started {
- srv.cfg = cfg
- srv.mu.Unlock()
- return nil
- }
- srv.mu.Unlock()
-
- if srv.cfg.Differs(cfg) {
- return errors.New("unable to reconfigure server (cmd and pool changes are allowed)")
- }
-
- srv.mu.Lock()
- previous := srv.pool
- srv.mu.Unlock()
-
- pool, err := NewPool(cfg.makeCommand(), srv.factory, cfg.Pool)
- if err != nil {
- return err
- }
-
- srv.mu.Lock()
- srv.cfg.Pool, srv.pool = cfg.Pool, pool
- srv.pool.Listen(srv.poolListener)
- srv.mu.Unlock()
-
- srv.throw(EventPoolConstruct, pool)
-
- if previous != nil {
- go func(previous Pool) {
- srv.throw(EventPoolDestruct, previous)
- previous.Destroy()
- }(previous)
- }
-
- return nil
-}
-
-// Reset resets the state of underlying pool and rebuilds all of it's workers.
-func (srv *Server) Reset() error {
- return srv.Reconfigure(srv.cfg)
-}
-
// Start underlying worker pool, configure factory and command provider.
func (srv *Server) Start() (err error) {
srv.mu.Lock()
@@ -151,6 +105,52 @@ func (srv *Server) Exec(rqs *Payload) (rsp *Payload, err error) {
return pool.Exec(rqs)
}
+// Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory
+// and relay settings.
+func (srv *Server) Reconfigure(cfg *ServerConfig) error {
+ srv.mu.Lock()
+ if !srv.started {
+ srv.cfg = cfg
+ srv.mu.Unlock()
+ return nil
+ }
+ srv.mu.Unlock()
+
+ if srv.cfg.Differs(cfg) {
+ return errors.New("unable to reconfigure server (cmd and pool changes are allowed)")
+ }
+
+ srv.mu.Lock()
+ previous := srv.pool
+ srv.mu.Unlock()
+
+ pool, err := NewPool(cfg.makeCommand(), srv.factory, cfg.Pool)
+ if err != nil {
+ return err
+ }
+
+ srv.mu.Lock()
+ srv.cfg.Pool, srv.pool = cfg.Pool, pool
+ srv.pool.Listen(srv.poolListener)
+ srv.mu.Unlock()
+
+ srv.throw(EventPoolConstruct, pool)
+
+ if previous != nil {
+ go func(previous Pool) {
+ srv.throw(EventPoolDestruct, previous)
+ previous.Destroy()
+ }(previous)
+ }
+
+ return nil
+}
+
+// Reset resets the state of underlying pool and rebuilds all of it's workers.
+func (srv *Server) Reset() error {
+ return srv.Reconfigure(srv.cfg)
+}
+
// Workers returns worker list associated with the server pool.
func (srv *Server) Workers() (workers []*Worker) {
p := srv.Pool()
diff --git a/service/container.go b/service/container.go
index 0e89f224..c47e0fd2 100644
--- a/service/container.go
+++ b/service/container.go
@@ -65,7 +65,7 @@ func (c *container) Register(name string, service Service) {
status: StatusRegistered,
})
- c.log.Debugf("%s: registered", color.GreenString(name))
+ c.log.Debugf("%s: registered", color.YellowString(name))
}
// Check hasStatus svc has been registered.
@@ -98,23 +98,20 @@ func (c *container) Get(target string) (svc Service, status int) {
// Configure configures all underlying services with given configuration.
func (c *container) Configure(cfg Config) error {
- c.mu.Lock()
- defer c.mu.Unlock()
-
for _, e := range c.services {
if e.getStatus() >= StatusConfigured {
- return fmt.Errorf("service %s has already been configured", color.GreenString(e.name))
+ return fmt.Errorf("service %s has already been configured", color.RedString(e.name))
}
segment := cfg.Get(e.name)
if segment == nil {
- c.log.Debugf("%s: no config has been provided", color.GreenString(e.name))
+ c.log.Debugf("%s: no config has been provided", color.YellowString(e.name))
continue
}
ok, err := e.svc.Configure(segment, c)
if err != nil {
- return errors.Wrap(err, fmt.Sprintf("%s", color.GreenString(e.name)))
+ return errors.Wrap(err, fmt.Sprintf("%s", color.RedString(e.name)))
} else if ok {
e.setStatus(StatusConfigured)
}
@@ -131,7 +128,6 @@ func (c *container) Serve() error {
)
defer close(done)
- c.mu.Lock()
for _, e := range c.services {
if e.hasStatus(StatusConfigured) {
numServing ++
@@ -145,12 +141,10 @@ func (c *container) Serve() error {
defer e.setStatus(StatusStopped)
if err := e.svc.Serve(); err != nil {
- c.log.Errorf("%s: %s", color.GreenString(e.name), err)
- done <- errors.Wrap(err, fmt.Sprintf("%s", color.GreenString(e.name)))
+ done <- errors.Wrap(err, fmt.Sprintf("%s", color.RedString(e.name)))
}
}(e)
}
- c.mu.Unlock()
for i := 0; i < numServing; i++ {
result := <-done
@@ -167,9 +161,6 @@ func (c *container) Serve() error {
// Stop sends stop command to all running services.
func (c *container) Stop() {
- c.mu.Lock()
- defer c.mu.Unlock()
-
for _, e := range c.services {
if e.hasStatus(StatusServing) {
e.svc.Stop()
diff --git a/static/service.go b/static/service.go
index 6be7f20f..03a18ba8 100644
--- a/static/service.go
+++ b/static/service.go
@@ -1,9 +1,7 @@
package static
import (
- "github.com/sirupsen/logrus"
"net/http"
- "os"
"path"
"strings"
rrttp "github.com/spiral/roadrunner/http"
@@ -15,9 +13,6 @@ const Name = "static"
// Service serves static files. Potentially convert into middleware?
type Service struct {
- // Logger is associated debug and error logger. Can be empty.
- Logger *logrus.Logger
-
// server configuration (location, forbidden files and etc)
cfg *Config
@@ -52,10 +47,6 @@ func (s *Service) Configure(cfg service.Config, c service.Container) (enabled bo
if h, ok := h.(*rrttp.Service); ok {
h.Add(s)
}
- } else {
- if s.Logger != nil {
- s.Logger.Warningf("no http service found")
- }
}
return true, nil
@@ -71,7 +62,6 @@ func (s *Service) Serve() error {
// Stop stops the service.
func (s *Service) Stop() {
- //todo: this is not safe (TODO CHECK IT?)
close(s.done)
}
@@ -84,29 +74,17 @@ func (s *Service) Handle(w http.ResponseWriter, r *http.Request) bool {
fPath = path.Clean(fPath)
if s.cfg.Forbids(fPath) {
- if s.Logger != nil {
- s.Logger.Warningf("attempt to access forbidden file %s", fPath)
- }
return false
}
f, err := s.root.Open(fPath)
if err != nil {
- if !os.IsNotExist(err) {
- if s.Logger != nil {
- s.Logger.Error(err)
- }
- }
-
return false
}
defer f.Close()
d, err := f.Stat()
if err != nil {
- if s.Logger != nil {
- s.Logger.Error(err)
- }
return false
}
diff --git a/static_pool.go b/static_pool.go
index a2d30679..887904b8 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -43,7 +43,7 @@ type StaticPool struct {
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) {
if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config error")
+ return nil, errors.Wrap(err, "config")
}
p := &StaticPool{