diff options
-rw-r--r-- | cmd/rr/http/reset.go (renamed from cmd/rr/http/reload.go) | 35 | ||||
-rw-r--r-- | cmd/rr/http/workers.go | 70 | ||||
-rw-r--r-- | http/config.go | 2 | ||||
-rw-r--r-- | http/rpc.go | 130 | ||||
-rw-r--r-- | http/service.go | 23 | ||||
-rw-r--r-- | state.go | 2 |
6 files changed, 141 insertions, 121 deletions
diff --git a/cmd/rr/http/reload.go b/cmd/rr/http/reset.go index 06149065..f5c100cc 100644 --- a/cmd/rr/http/reload.go +++ b/cmd/rr/http/reset.go @@ -24,34 +24,35 @@ import ( "errors" "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/rpc" + "github.com/spiral/roadrunner/service" ) func init() { rr.CLI.AddCommand(&cobra.Command{ - Use: "http:reload", + Use: "http:reset", Short: "Reload RoadRunner worker pools for the HTTP service", RunE: reloadHandler, }) } func reloadHandler(cmd *cobra.Command, args []string) error { - if !rr.Container.Has("rpc") { + svc, st := rr.Container.Get(rpc.Name) + if st < service.StatusConfigured { return errors.New("RPC service is not configured") } - return nil - //todo: change - //client, err := rr.Container.Get("rpc").(*rpc.Service).Client() - //if err != nil { - // return err - //} - //defer client.Close() - // - //var r string - //if err := client.Call("http.Reset", true, &r); err != nil { - // return err - //} - // - //rr.Logger.Info("http.service: restarting worker pool") - //return nil + client, err := svc.(*rpc.Service).Client() + if err != nil { + return err + } + defer client.Close() + + var r string + if err := client.Call("http.Reset", true, &r); err != nil { + return err + } + + rr.Logger.Info("http: restarting worker pool") + return nil } diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go index c9aa5b04..62730e25 100644 --- a/cmd/rr/http/workers.go +++ b/cmd/rr/http/workers.go @@ -24,6 +24,14 @@ import ( "errors" "github.com/spf13/cobra" rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/rpc" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/http" + "github.com/olekukonko/tablewriter" + "os" + "strconv" + "time" + "github.com/fatih/color" ) func init() { @@ -35,33 +43,47 @@ func init() { } func workersHandler(cmd *cobra.Command, args []string) error { - if !rr.Container.Has("rpc") { + svc, st := rr.Container.Get(rpc.Name) + if st < service.StatusConfigured { return errors.New("RPC service is not configured") } - //todo: change - //client, err := rr.Container.Get("rpc").(*rpc.Service).Client() - //if err != nil { - // return err - //} - //defer client.Close() + client, err := svc.(*rpc.Service).Client() + if err != nil { + return err + } + defer client.Close() + + var r http.WorkerList + if err := client.Call("http.Workers", true, &r); err != nil { + panic(err) + } + + tw := tablewriter.NewWriter(os.Stdout) + tw.SetHeader([]string{"PID", "Status", "Handled Jobs", "Alive"}) + + for _, w := range r.Workers { + tw.Append([]string{ + color.YellowString(strconv.Itoa(w.Pid)), + renderStatus(w.Status), + renderJobs(w.NumJobs), + renderAlive(time.Unix(0, w.Created)), + }) + } + + tw.Render() - //var r http.WorkerList - //if err := client.Call("http.Workers", true, &r); err != nil { - // panic(err) - //} - // - //tw := tablewriter.NewWriter(os.Stdout) - //tw.SetHeader([]string{"PID", "Status", "Num Execs"}) - // - //for _, w := range r.Workers { - // tw.Append([]string{ - // strconv.Itoa(w.Pid), - // w.Status, - // strconv.Itoa(int(w.NumExecs)), - // }) - //} - // - //tw.Render() return nil } + +func renderStatus(status string) string { + return status +} + +func renderJobs(number uint64) string { + return strconv.Itoa(int(number)) +} + +func renderAlive(t time.Time) string { + return time.Now().Sub(t).String() +} diff --git a/http/config.go b/http/config.go index abb14d9a..d92b4c60 100644 --- a/http/config.go +++ b/http/config.go @@ -6,7 +6,7 @@ import ( // Configures RoadRunner HTTP server. type Config struct { - // Enable enables http service. + // Enable enables http svc. Enable bool // Address and port to handle as http server. diff --git a/http/rpc.go b/http/rpc.go index d9f911f4..2e33aad7 100644 --- a/http/rpc.go +++ b/http/rpc.go @@ -1,73 +1,61 @@ package http -// -//type rpcServer struct { -// service *Service -//} -// -//// WorkerList contains list of workers. -//type WorkerList struct { -// // Workers is list of workers. -// Workers []utils.Worker `json:"workers"` -//} -// -//// Reset resets underlying RR worker pool and restarts all of it's workers. -//func (rpc *rpcServer) Reset(reset bool, r *string) error { -// if rpc.service.srv == nil { -// return errors.New("no http server") -// } -// -// logrus.Info("http: restarting worker pool") -// *r = "OK" -// -// err := rpc.service.srv.rr.Reset() -// if err != nil { -// logrus.Errorf("http: %s", err) -// } -// -// return err -//} -// -//// Workers returns list of active workers and their stats. -//func (rpc *rpcServer) Workers(list bool, r *WorkerList) error { -// if rpc.service.srv == nil { -// return errors.New("no http server") -// } -// -// r.Workers = utils.FetchWorkers(rpc.service.srv.rr) -// return nil -//} -// -//// Worker provides information about specific worker. -//type Worker struct { -// // Pid contains process id. -// Pid int `json:"pid"` -// -// // Status of the worker. -// Status string `json:"status"` -// -// // Number of worker executions. -// NumExecs uint64 `json:"numExecs"` -// -// // Created is unix nano timestamp of worker creation time. -// Created int64 `json:"created"` -// -// // Updated is unix nano timestamp of last worker execution. -// Updated int64 `json:"updated"` -//} -// -//// FetchWorkers fetches list of workers from RR Server. -//func FetchWorkers(srv *roadrunner.Server) (result []Worker) { -// for _, w := range srv.Workers() { -// state := w.State() -// result = append(result, Worker{ -// Pid: *w.Pid, -// Status: state.String(), -// NumExecs: state.NumExecs(), -// Created: w.Created.UnixNano(), -// Updated: state.Updated().UnixNano(), -// }) -// } -// -// return -//} +import ( + "github.com/pkg/errors" +) + +type rpcServer struct{ svc *Service } + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []Worker `json:"workers"` +} + +// Worker provides information about specific worker. +type Worker struct { + // Pid contains process id. + Pid int `json:"pid"` + + // Status of the worker. + Status string `json:"status"` + + // Number of worker executions. + NumJobs uint64 `json:"numExecs"` + + // Created is unix nano timestamp of worker creation time. + Created int64 `json:"created"` + + // Updated is unix nano timestamp of last worker execution. + Updated int64 `json:"updated"` +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *rpcServer) Reset(reset bool, r *string) error { + if rpc.svc.srv == nil { + return errors.New("http server is not running") + } + + *r = "OK" + return rpc.svc.srv.rr.Reset() +} + +// Workers returns list of active workers and their stats. +func (rpc *rpcServer) Workers(list bool, r *WorkerList) error { + if rpc.svc.srv == nil { + return errors.New("http server is not running") + } + + for _, w := range rpc.svc.rr.Workers() { + state := w.State() + r.Workers = append(r.Workers, Worker{ + Pid: *w.Pid, + Status: state.String(), + NumJobs: state.NumExecs(), + Created: w.Created.UnixNano(), + Updated: state.Updated().UnixNano(), + }) + } + + return nil +} diff --git a/http/service.go b/http/service.go index dc996147..81ffb8a2 100644 --- a/http/service.go +++ b/http/service.go @@ -5,9 +5,10 @@ import ( "github.com/spiral/roadrunner/service" "context" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/rpc" ) -// Name contains default service name. +// Name contains default svc name. const Name = "http" type Middleware interface { @@ -17,8 +18,11 @@ type Middleware interface { // Service manages rr, http servers. type Service struct { - cfg *Config - listener func(event int, ctx interface{}) + cfg *Config + + // todo: multiple listeners + listener func(event int, ctx interface{}) + middleware []Middleware rr *roadrunner.Server srv *Server @@ -34,7 +38,7 @@ func (s *Service) Listen(o func(event int, ctx interface{})) { s.listener = o } -// Configure must return configure service and return true if service hasStatus enabled. Must return error in case of +// Configure must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. func (s *Service) Configure(cfg service.Config, c service.Container) (bool, error) { config := &Config{} @@ -52,12 +56,17 @@ func (s *Service) Configure(cfg service.Config, c service.Container) (bool, erro s.cfg = config - // todo: RPC + // registering http RPC interface + if r, ok := c.Get(rpc.Name); ok >= service.StatusConfigured { + if h, ok := r.(*rpc.Service); ok { + h.Register(Name, &rpcServer{s}) + } + } return true, nil } -// Serve serves the service. +// Serve serves the svc. func (s *Service) Serve() error { rr := roadrunner.NewServer(s.cfg.Workers) @@ -86,7 +95,7 @@ func (s *Service) Serve() error { return nil } -// Stop stops the service. +// Stop stops the svc. func (s *Service) Stop() { if s.http == nil { return @@ -14,7 +14,7 @@ type State interface { // Value returns state value Value() int64 - // NumExecs shows how many times worker was invoked + // NumJobs shows how many times worker was invoked NumExecs() uint64 // Updated indicates a moment updated last state change |