diff options
author | Wolfy-J <[email protected]> | 2018-06-10 21:17:54 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-10 21:17:54 +0300 |
commit | 7cc6d00a1c350eb3147ede00802d312d4be94dee (patch) | |
tree | ac1391c831d4366b477e61b57e095a1dfeafbbb7 | |
parent | e9203e05a7f3278a8080d0f69e6640e5d3d1042d (diff) |
debug is not service anymore
-rw-r--r-- | cmd/rr/.rr.yaml | 12 | ||||
-rw-r--r-- | cmd/rr/debug/listener.go | 93 | ||||
-rw-r--r-- | cmd/rr/http/workers.go | 10 | ||||
-rw-r--r-- | cmd/rr/main.go | 16 | ||||
-rw-r--r-- | config.go | 4 | ||||
-rw-r--r-- | debug/config.go | 7 | ||||
-rw-r--r-- | debug/service.go | 123 | ||||
-rw-r--r-- | http/rpc.go | 2 | ||||
-rw-r--r-- | http/service.go | 2 | ||||
-rw-r--r-- | state.go | 35 | ||||
-rw-r--r-- | static_pool.go | 4 |
11 files changed, 136 insertions, 172 deletions
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml index cb84af54..29888f69 100644 --- a/cmd/rr/.rr.yaml +++ b/cmd/rr/.rr.yaml @@ -33,7 +33,7 @@ http: # worker pool configuration. pool: # number of workers to be serving. - numWorkers: 1 + numWorkers: 16 # maximum jobs per worker, 0 - unlimited. maxJobs: 0 @@ -53,12 +53,4 @@ static: dir: "c:/GoProj/phpapp/webroot" # list of extensions for forbid for serving. - forbid: [".php", ".htaccess"] - -# rr debugging -debug: - # enable debug output - enable: true - - # show individual worker events - verbose: true
\ No newline at end of file + forbid: [".php", ".htaccess"]
\ No newline at end of file diff --git a/cmd/rr/debug/listener.go b/cmd/rr/debug/listener.go new file mode 100644 index 00000000..fd3b95d3 --- /dev/null +++ b/cmd/rr/debug/listener.go @@ -0,0 +1,93 @@ +package debug + +import ( + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner/http" + "github.com/spiral/roadrunner/utils" + "github.com/spiral/roadrunner" +) + +// Listener provide debug callback for system events. With colors! +type listener struct{ logger *logrus.Logger } + +// NewListener creates new debug listener. +func NewListener(logger *logrus.Logger) *listener { + return &listener{logger} +} + +// Listener listens to http events and generates nice looking output. +func (s *listener) Listener(event int, ctx interface{}) { + // http events + switch event { + case http.EventResponse: + log := ctx.(*http.Log) + s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) + case http.EventError: + log := ctx.(*http.Log) + + if _, ok := log.Error.(roadrunner.JobError); ok { + s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) + } else { + s.logger.Info(utils.Sprintf( + "%s <white+hb>%s</reset> %s <red>%s</reset>", + statusColor(log.Status), + log.Method, + log.Uri, + log.Error, + )) + } + } + + switch event { + case roadrunner.EventWorkerKill: + w := ctx.(*roadrunner.Worker) + s.logger.Warning(utils.Sprintf( + "<white+hb>worker.%v</reset> <yellow>killed</red>", + *w.Pid, + )) + + case roadrunner.EventWorkerError: + err := ctx.(roadrunner.WorkerError) + s.logger.Error(utils.Sprintf( + "<white+hb>worker.%v</reset> <red>%s</reset>", + *err.Worker.Pid, + err.Caused, + )) + } + + // rr server events + switch event { + case roadrunner.EventServerFailure: + s.logger.Error(utils.Sprintf("<red>server is dead</reset>")) + } + + // pool events + switch event { + case roadrunner.EventPoolConstruct: + s.logger.Debug(utils.Sprintf("<cyan>new worker pool</reset>")) + case roadrunner.EventPoolError: + s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx)) + } +} + +// Serve serves. +func (s *listener) Serve() error { return nil } + +// Stop stops the Listener. +func (s *listener) Stop() {} + +func statusColor(status int) string { + if status < 300 { + return utils.Sprintf("<green>%v</reset>", status) + } + + if status < 400 { + return utils.Sprintf("<cyan>%v</reset>", status) + } + + if status < 500 { + return utils.Sprintf("<yellow>%v</reset>", status) + } + + return utils.Sprintf("<red+hb>%v</reset>", status) +} diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go index ddfe337c..3868d748 100644 --- a/cmd/rr/http/workers.go +++ b/cmd/rr/http/workers.go @@ -61,7 +61,13 @@ func init() { rr.CLI.AddCommand(workersCommand) } -func workersHandler(cmd *cobra.Command, args []string) error { +func workersHandler(cmd *cobra.Command, args []string) (err error) { + defer func() { + if r, ok := recover().(error); ok { + err = r + } + }() + svc, st := rr.Container.Get(rrpc.Name) if st < service.StatusConfigured { return errors.New("RPC service is not configured") @@ -136,7 +142,7 @@ func renderStatus(status string) string { return status } -func renderJobs(number uint64) string { +func renderJobs(number int64) string { return humanize.Comma(int64(number)) } diff --git a/cmd/rr/main.go b/cmd/rr/main.go index 6065d3d1..945b8fa0 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -32,9 +32,13 @@ import ( // cli plugins _ "github.com/spiral/roadrunner/cmd/rr/http" - "github.com/spiral/roadrunner/debug" + "github.com/spiral/roadrunner/cmd/rr/debug" + + "github.com/spf13/cobra" ) +var debugMode bool + func main() { // provides ability to make local connection to services rr.Container.Register(rpc.Name, &rpc.Service{}) @@ -46,7 +50,15 @@ func main() { rr.Container.Register(static.Name, &static.Service{}) // provides additional verbosity - rr.Container.Register(debug.Name, &debug.Service{Logger: rr.Logger}) + + // debug mode + rr.CLI.PersistentFlags().BoolVarP(&debugMode, "debug", "d", false, "debug mode", ) + cobra.OnInitialize(func() { + if debugMode { + service, _ := rr.Container.Get(http.Name) + service.(*http.Service).AddListener(debug.NewListener(rr.Logger).Listener) + } + }) // you can register additional commands using cmd.CLI rr.Execute() @@ -9,12 +9,12 @@ import ( type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. - NumWorkers uint64 + NumWorkers int64 // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. - MaxJobs uint64 + MaxJobs int64 // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. diff --git a/debug/config.go b/debug/config.go deleted file mode 100644 index 54ec9bf5..00000000 --- a/debug/config.go +++ /dev/null @@ -1,7 +0,0 @@ -package debug - -// Config enabled and disabled debug service. -type Config struct { - // Enable must be set to true to enable debugging. - Enable bool -} diff --git a/debug/service.go b/debug/service.go deleted file mode 100644 index 70520614..00000000 --- a/debug/service.go +++ /dev/null @@ -1,123 +0,0 @@ -package debug - -import ( - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/http" - "github.com/spiral/roadrunner/utils" - "github.com/spiral/roadrunner" -) - -// Default service name. -const Name = "debug" - -// Service provide debug callback for system events. With colors! -type Service struct { - // Logger used to flush all debug events. - Logger *logrus.Logger - - cfg *Config -} - -// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of -// misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Configure(cfg service.Config, c service.Container) (enabled bool, err error) { - config := &Config{} - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { - return false, nil - } - - s.cfg = config - - // registering as middleware - if h, ok := c.Get(http.Name); ok >= service.StatusConfigured { - if h, ok := h.(*http.Service); ok { - h.AddListener(s.listener) - } - } else { - s.Logger.Error("unable to find http server") - } - - return true, nil -} - -// listener listens to http events and generates nice looking output. -func (s *Service) listener(event int, ctx interface{}) { - // http events - switch event { - case http.EventResponse: - log := ctx.(*http.Log) - s.Logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) - case http.EventError: - log := ctx.(*http.Log) - - if _, ok := log.Error.(roadrunner.JobError); ok { - s.Logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.Uri)) - } else { - s.Logger.Info(utils.Sprintf( - "%s <white+hb>%s</reset> %s <red>%s</reset>", - statusColor(log.Status), - log.Method, - log.Uri, - log.Error, - )) - } - } - - switch event { - case roadrunner.EventWorkerKill: - w := ctx.(*roadrunner.Worker) - s.Logger.Warning(utils.Sprintf( - "<white+hb>worker.%v</reset> <yellow>killed</red>", - *w.Pid, - )) - - case roadrunner.EventWorkerError: - err := ctx.(roadrunner.WorkerError) - s.Logger.Error(utils.Sprintf( - "<white+hb>worker.%v</reset> <red>%s</reset>", - *err.Worker.Pid, - err.Caused, - )) - } - - // rr server events - switch event { - case roadrunner.EventServerFailure: - s.Logger.Error(utils.Sprintf("<red>server is dead</reset>")) - } - - // pool events - switch event { - case roadrunner.EventPoolConstruct: - s.Logger.Debug(utils.Sprintf("<cyan>new worker pool</reset>")) - case roadrunner.EventPoolError: - s.Logger.Error(utils.Sprintf("<red>%s</reset>", ctx)) - } -} - -// Serve serves. -func (s *Service) Serve() error { return nil } - -// Stop stops the service. -func (s *Service) Stop() {} - -func statusColor(status int) string { - if status < 300 { - return utils.Sprintf("<green>%v</reset>", status) - } - - if status < 400 { - return utils.Sprintf("<cyan>%v</reset>", status) - } - - if status < 500 { - return utils.Sprintf("<yellow>%v</reset>", status) - } - - return utils.Sprintf("<red+hb>%v</reset>", status) -} diff --git a/http/rpc.go b/http/rpc.go index 2e33aad7..2adb8706 100644 --- a/http/rpc.go +++ b/http/rpc.go @@ -21,7 +21,7 @@ type Worker struct { Status string `json:"status"` // Number of worker executions. - NumJobs uint64 `json:"numExecs"` + NumJobs int64 `json:"numExecs"` // Created is unix nano timestamp of worker creation time. Created int64 `json:"created"` diff --git a/http/service.go b/http/service.go index 7835a652..79141d07 100644 --- a/http/service.go +++ b/http/service.go @@ -113,6 +113,8 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (s *Service) listener(event int, ctx interface{}) { + // todo: DIE on server failure + for _, l := range s.listeners { l(event, ctx) } @@ -2,7 +2,6 @@ package roadrunner import ( "fmt" - "sync" "sync/atomic" "time" ) @@ -15,7 +14,7 @@ type State interface { Value() int64 // NumJobs shows how many times worker was invoked - NumExecs() uint64 + NumExecs() int64 // Updated indicates a moment updated last state change Updated() time.Time @@ -42,19 +41,18 @@ const ( ) type state struct { - mu sync.RWMutex value int64 - numExecs uint64 - updated time.Time + numExecs int64 + updated int64 } func newState(value int64) *state { - return &state{value: value, updated: time.Now()} + return &state{value: value, updated: time.Now().Unix()} } // String returns current state as string. func (s *state) String() string { - switch s.value { + switch s.Value() { case StateInactive: return "inactive" case StateReady: @@ -72,10 +70,7 @@ func (s *state) String() string { // Value state returns state value func (s *state) Value() int64 { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.value + return atomic.LoadInt64(&s.value) } // IsActive returns true if worker not Inactive or Stopped @@ -86,26 +81,20 @@ func (s *state) IsActive() bool { // Updated indicates a moment updated last state change func (s *state) Updated() time.Time { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.updated + return time.Unix(0, atomic.LoadInt64(&s.updated)) } -func (s *state) NumExecs() uint64 { - return atomic.LoadUint64(&s.numExecs) +func (s *state) NumExecs() int64 { + return atomic.LoadInt64(&s.numExecs) } // change state value (status) func (s *state) set(value int64) { - s.mu.Lock() - defer s.mu.Unlock() - - s.value = value - s.updated = time.Now() + atomic.StoreInt64(&s.value, value) + atomic.StoreInt64(&s.updated, time.Now().Unix()) } // register new execution atomically func (s *state) registerExec() { - atomic.AddUint64(&s.numExecs, 1) + atomic.AddInt64(&s.numExecs, 1) } diff --git a/static_pool.go b/static_pool.go index 9f4aab23..bb418649 100644 --- a/static_pool.go +++ b/static_pool.go @@ -55,7 +55,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er } // constant number of workers simplify logic - for i := uint64(0); i < p.cfg.NumWorkers; i++ { + for i := int64(0); i < p.cfg.NumWorkers; i++ { // to test if worker ready w, err := p.createWorker() if err != nil { @@ -143,7 +143,7 @@ func (p *StaticPool) Destroy() { // finds free worker in a given time interval or creates new if allowed. func (p *StaticPool) allocateWorker() (w *Worker, err error) { // this loop is required to skip issues with dead workers still being in a ring. - for i := uint64(0); i < p.cfg.NumWorkers; i++ { + for i := int64(0); i < p.cfg.NumWorkers; i++ { select { case w = <-p.free: if w.state.Value() == StateReady { |