summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/.rr.yaml12
-rw-r--r--cmd/rr/debug/listener.go93
-rw-r--r--cmd/rr/http/workers.go10
-rw-r--r--cmd/rr/main.go16
-rw-r--r--config.go4
-rw-r--r--debug/config.go7
-rw-r--r--debug/service.go123
-rw-r--r--http/rpc.go2
-rw-r--r--http/service.go2
-rw-r--r--state.go35
-rw-r--r--static_pool.go4
11 files changed, 136 insertions, 172 deletions
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
index cb84af54..29888f69 100644
--- a/cmd/rr/.rr.yaml
+++ b/cmd/rr/.rr.yaml
@@ -33,7 +33,7 @@ http:
# worker pool configuration.
pool:
# number of workers to be serving.
- numWorkers: 1
+ numWorkers: 16
# maximum jobs per worker, 0 - unlimited.
maxJobs: 0
@@ -53,12 +53,4 @@ static:
dir: "c:/GoProj/phpapp/webroot"
# list of extensions for forbid for serving.
- forbid: [".php", ".htaccess"]
-
-# rr debugging
-debug:
- # enable debug output
- enable: true
-
- # show individual worker events
- verbose: true \ No newline at end of file
+ forbid: [".php", ".htaccess"] \ No newline at end of file
diff --git a/cmd/rr/debug/listener.go b/cmd/rr/debug/listener.go
new file mode 100644
index 00000000..fd3b95d3
--- /dev/null
+++ b/cmd/rr/debug/listener.go
@@ -0,0 +1,93 @@
+package debug
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner/http"
+ "github.com/spiral/roadrunner/utils"
+ "github.com/spiral/roadrunner"
+)
+
+// Listener provide debug callback for system events. With colors!
+type listener struct{ logger *logrus.Logger }
+
+// NewListener creates new debug listener.
+func NewListener(logger *logrus.Logger) *listener {
+ return &listener{logger}
+}
+
+// Listener listens to http events and generates nice looking output.
+func (s *listener) Listener(event int, ctx interface{}) {
+ // http events
+ switch event {
+ case http.EventResponse:
+ log := ctx.(*http.Log)
+ s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
+ case http.EventError:
+ log := ctx.(*http.Log)
+
+ if _, ok := log.Error.(roadrunner.JobError); ok {
+ s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
+ } else {
+ s.logger.Info(utils.Sprintf(
+ "%s <white+hb>%s</reset> %s <red>%s</reset>",
+ statusColor(log.Status),
+ log.Method,
+ log.Uri,
+ log.Error,
+ ))
+ }
+ }
+
+ switch event {
+ case roadrunner.EventWorkerKill:
+ w := ctx.(*roadrunner.Worker)
+ s.logger.Warning(utils.Sprintf(
+ "<white+hb>worker.%v</reset> <yellow>killed</red>",
+ *w.Pid,
+ ))
+
+ case roadrunner.EventWorkerError:
+ err := ctx.(roadrunner.WorkerError)
+ s.logger.Error(utils.Sprintf(
+ "<white+hb>worker.%v</reset> <red>%s</reset>",
+ *err.Worker.Pid,
+ err.Caused,
+ ))
+ }
+
+ // rr server events
+ switch event {
+ case roadrunner.EventServerFailure:
+ s.logger.Error(utils.Sprintf("<red>server is dead</reset>"))
+ }
+
+ // pool events
+ switch event {
+ case roadrunner.EventPoolConstruct:
+ s.logger.Debug(utils.Sprintf("<cyan>new worker pool</reset>"))
+ case roadrunner.EventPoolError:
+ s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx))
+ }
+}
+
+// Serve serves.
+func (s *listener) Serve() error { return nil }
+
+// Stop stops the Listener.
+func (s *listener) Stop() {}
+
+func statusColor(status int) string {
+ if status < 300 {
+ return utils.Sprintf("<green>%v</reset>", status)
+ }
+
+ if status < 400 {
+ return utils.Sprintf("<cyan>%v</reset>", status)
+ }
+
+ if status < 500 {
+ return utils.Sprintf("<yellow>%v</reset>", status)
+ }
+
+ return utils.Sprintf("<red+hb>%v</reset>", status)
+}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
index ddfe337c..3868d748 100644
--- a/cmd/rr/http/workers.go
+++ b/cmd/rr/http/workers.go
@@ -61,7 +61,13 @@ func init() {
rr.CLI.AddCommand(workersCommand)
}
-func workersHandler(cmd *cobra.Command, args []string) error {
+func workersHandler(cmd *cobra.Command, args []string) (err error) {
+ defer func() {
+ if r, ok := recover().(error); ok {
+ err = r
+ }
+ }()
+
svc, st := rr.Container.Get(rrpc.Name)
if st < service.StatusConfigured {
return errors.New("RPC service is not configured")
@@ -136,7 +142,7 @@ func renderStatus(status string) string {
return status
}
-func renderJobs(number uint64) string {
+func renderJobs(number int64) string {
return humanize.Comma(int64(number))
}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 6065d3d1..945b8fa0 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -32,9 +32,13 @@ import (
// cli plugins
_ "github.com/spiral/roadrunner/cmd/rr/http"
- "github.com/spiral/roadrunner/debug"
+ "github.com/spiral/roadrunner/cmd/rr/debug"
+
+ "github.com/spf13/cobra"
)
+var debugMode bool
+
func main() {
// provides ability to make local connection to services
rr.Container.Register(rpc.Name, &rpc.Service{})
@@ -46,7 +50,15 @@ func main() {
rr.Container.Register(static.Name, &static.Service{})
// provides additional verbosity
- rr.Container.Register(debug.Name, &debug.Service{Logger: rr.Logger})
+
+ // debug mode
+ rr.CLI.PersistentFlags().BoolVarP(&debugMode, "debug", "d", false, "debug mode", )
+ cobra.OnInitialize(func() {
+ if debugMode {
+ service, _ := rr.Container.Get(http.Name)
+ service.(*http.Service).AddListener(debug.NewListener(rr.Logger).Listener)
+ }
+ })
// you can register additional commands using cmd.CLI
rr.Execute()
diff --git a/config.go b/config.go
index 8fe8dadf..0ffe29a2 100644
--- a/config.go
+++ b/config.go
@@ -9,12 +9,12 @@ import (
type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap.
- NumWorkers uint64
+ NumWorkers int64
// MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
- MaxJobs uint64
+ MaxJobs int64
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task.
diff --git a/debug/config.go b/debug/config.go
deleted file mode 100644
index 54ec9bf5..00000000
--- a/debug/config.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package debug
-
-// Config enabled and disabled debug service.
-type Config struct {
- // Enable must be set to true to enable debugging.
- Enable bool
-}
diff --git a/debug/service.go b/debug/service.go
deleted file mode 100644
index 70520614..00000000
--- a/debug/service.go
+++ /dev/null
@@ -1,123 +0,0 @@
-package debug
-
-import (
- "github.com/sirupsen/logrus"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/http"
- "github.com/spiral/roadrunner/utils"
- "github.com/spiral/roadrunner"
-)
-
-// Default service name.
-const Name = "debug"
-
-// Service provide debug callback for system events. With colors!
-type Service struct {
- // Logger used to flush all debug events.
- Logger *logrus.Logger
-
- cfg *Config
-}
-
-// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of
-// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Service) Configure(cfg service.Config, c service.Container) (enabled bool, err error) {
- config := &Config{}
- if err := cfg.Unmarshal(config); err != nil {
- return false, err
- }
-
- if !config.Enable {
- return false, nil
- }
-
- s.cfg = config
-
- // registering as middleware
- if h, ok := c.Get(http.Name); ok >= service.StatusConfigured {
- if h, ok := h.(*http.Service); ok {
- h.AddListener(s.listener)
- }
- } else {
- s.Logger.Error("unable to find http server")
- }
-
- return true, nil
-}
-
-// listener listens to http events and generates nice looking output.
-func (s *Service) listener(event int, ctx interface{}) {
- // http events
- switch event {
- case http.EventResponse:
- log := ctx.(*http.Log)
- s.Logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
- case http.EventError:
- log := ctx.(*http.Log)
-
- if _, ok := log.Error.(roadrunner.JobError); ok {
- s.Logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri))
- } else {
- s.Logger.Info(utils.Sprintf(
- "%s <white+hb>%s</reset> %s <red>%s</reset>",
- statusColor(log.Status),
- log.Method,
- log.Uri,
- log.Error,
- ))
- }
- }
-
- switch event {
- case roadrunner.EventWorkerKill:
- w := ctx.(*roadrunner.Worker)
- s.Logger.Warning(utils.Sprintf(
- "<white+hb>worker.%v</reset> <yellow>killed</red>",
- *w.Pid,
- ))
-
- case roadrunner.EventWorkerError:
- err := ctx.(roadrunner.WorkerError)
- s.Logger.Error(utils.Sprintf(
- "<white+hb>worker.%v</reset> <red>%s</reset>",
- *err.Worker.Pid,
- err.Caused,
- ))
- }
-
- // rr server events
- switch event {
- case roadrunner.EventServerFailure:
- s.Logger.Error(utils.Sprintf("<red>server is dead</reset>"))
- }
-
- // pool events
- switch event {
- case roadrunner.EventPoolConstruct:
- s.Logger.Debug(utils.Sprintf("<cyan>new worker pool</reset>"))
- case roadrunner.EventPoolError:
- s.Logger.Error(utils.Sprintf("<red>%s</reset>", ctx))
- }
-}
-
-// Serve serves.
-func (s *Service) Serve() error { return nil }
-
-// Stop stops the service.
-func (s *Service) Stop() {}
-
-func statusColor(status int) string {
- if status < 300 {
- return utils.Sprintf("<green>%v</reset>", status)
- }
-
- if status < 400 {
- return utils.Sprintf("<cyan>%v</reset>", status)
- }
-
- if status < 500 {
- return utils.Sprintf("<yellow>%v</reset>", status)
- }
-
- return utils.Sprintf("<red+hb>%v</reset>", status)
-}
diff --git a/http/rpc.go b/http/rpc.go
index 2e33aad7..2adb8706 100644
--- a/http/rpc.go
+++ b/http/rpc.go
@@ -21,7 +21,7 @@ type Worker struct {
Status string `json:"status"`
// Number of worker executions.
- NumJobs uint64 `json:"numExecs"`
+ NumJobs int64 `json:"numExecs"`
// Created is unix nano timestamp of worker creation time.
Created int64 `json:"created"`
diff --git a/http/service.go b/http/service.go
index 7835a652..79141d07 100644
--- a/http/service.go
+++ b/http/service.go
@@ -113,6 +113,8 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (s *Service) listener(event int, ctx interface{}) {
+ // todo: DIE on server failure
+
for _, l := range s.listeners {
l(event, ctx)
}
diff --git a/state.go b/state.go
index da4d2b6f..08f5dee8 100644
--- a/state.go
+++ b/state.go
@@ -2,7 +2,6 @@ package roadrunner
import (
"fmt"
- "sync"
"sync/atomic"
"time"
)
@@ -15,7 +14,7 @@ type State interface {
Value() int64
// NumJobs shows how many times worker was invoked
- NumExecs() uint64
+ NumExecs() int64
// Updated indicates a moment updated last state change
Updated() time.Time
@@ -42,19 +41,18 @@ const (
)
type state struct {
- mu sync.RWMutex
value int64
- numExecs uint64
- updated time.Time
+ numExecs int64
+ updated int64
}
func newState(value int64) *state {
- return &state{value: value, updated: time.Now()}
+ return &state{value: value, updated: time.Now().Unix()}
}
// String returns current state as string.
func (s *state) String() string {
- switch s.value {
+ switch s.Value() {
case StateInactive:
return "inactive"
case StateReady:
@@ -72,10 +70,7 @@ func (s *state) String() string {
// Value state returns state value
func (s *state) Value() int64 {
- s.mu.RLock()
- defer s.mu.RUnlock()
-
- return s.value
+ return atomic.LoadInt64(&s.value)
}
// IsActive returns true if worker not Inactive or Stopped
@@ -86,26 +81,20 @@ func (s *state) IsActive() bool {
// Updated indicates a moment updated last state change
func (s *state) Updated() time.Time {
- s.mu.RLock()
- defer s.mu.RUnlock()
-
- return s.updated
+ return time.Unix(0, atomic.LoadInt64(&s.updated))
}
-func (s *state) NumExecs() uint64 {
- return atomic.LoadUint64(&s.numExecs)
+func (s *state) NumExecs() int64 {
+ return atomic.LoadInt64(&s.numExecs)
}
// change state value (status)
func (s *state) set(value int64) {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- s.value = value
- s.updated = time.Now()
+ atomic.StoreInt64(&s.value, value)
+ atomic.StoreInt64(&s.updated, time.Now().Unix())
}
// register new execution atomically
func (s *state) registerExec() {
- atomic.AddUint64(&s.numExecs, 1)
+ atomic.AddInt64(&s.numExecs, 1)
}
diff --git a/static_pool.go b/static_pool.go
index 9f4aab23..bb418649 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -55,7 +55,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
}
// constant number of workers simplify logic
- for i := uint64(0); i < p.cfg.NumWorkers; i++ {
+ for i := int64(0); i < p.cfg.NumWorkers; i++ {
// to test if worker ready
w, err := p.createWorker()
if err != nil {
@@ -143,7 +143,7 @@ func (p *StaticPool) Destroy() {
// finds free worker in a given time interval or creates new if allowed.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
// this loop is required to skip issues with dead workers still being in a ring.
- for i := uint64(0); i < p.cfg.NumWorkers; i++ {
+ for i := int64(0); i < p.cfg.NumWorkers; i++ {
select {
case w = <-p.free:
if w.state.Value() == StateReady {